001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import java.io.ByteArrayOutputStream;
025import java.io.IOException;
026import java.io.PrintStream;
027import java.util.ArrayList;
028import java.util.Arrays;
029
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseTestingUtil;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.Put;
034import org.apache.hadoop.hbase.client.Table;
035import org.apache.hadoop.hbase.testclassification.LargeTests;
036import org.apache.hadoop.hbase.testclassification.MapReduceTests;
037import org.apache.hadoop.hbase.util.Bytes;
038import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
039import org.apache.hadoop.hbase.util.LauncherSecurityManager;
040import org.apache.hadoop.mapreduce.Counter;
041import org.apache.hadoop.mapreduce.Job;
042import org.junit.AfterClass;
043import org.junit.BeforeClass;
044import org.junit.ClassRule;
045import org.junit.Test;
046import org.junit.experimental.categories.Category;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050/**
051 * Test the rowcounter map reduce job.
052 */
053@Category({MapReduceTests.class, LargeTests.class})
054public class TestRowCounter {
055
056  @ClassRule
057  public static final HBaseClassTestRule CLASS_RULE =
058    HBaseClassTestRule.forClass(TestRowCounter.class);
059
060  private static final Logger LOG = LoggerFactory.getLogger(TestRowCounter.class);
061  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
062  private final static String TABLE_NAME = "testRowCounter";
063  private final static String TABLE_NAME_TS_RANGE = "testRowCounter_ts_range";
064  private final static String COL_FAM = "col_fam";
065  private final static String COL1 = "c1";
066  private final static String COL2 = "c2";
067  private final static String COMPOSITE_COLUMN = "C:A:A";
068  private final static int TOTAL_ROWS = 10;
069  private final static int ROWS_WITH_ONE_COL = 2;
070
071  /**
072   * @throws java.lang.Exception
073   */
074  @BeforeClass
075  public static void setUpBeforeClass() throws Exception {
076    TEST_UTIL.startMiniCluster();
077    Table table = TEST_UTIL.createTable(TableName.valueOf(TABLE_NAME), Bytes.toBytes(COL_FAM));
078    writeRows(table, TOTAL_ROWS, ROWS_WITH_ONE_COL);
079    table.close();
080  }
081
082  /**
083   * @throws java.lang.Exception
084   */
085  @AfterClass
086  public static void tearDownAfterClass() throws Exception {
087    TEST_UTIL.shutdownMiniCluster();
088  }
089
090  /**
091   * Test a case when no column was specified in command line arguments.
092   *
093   * @throws Exception
094   */
095  @Test
096  public void testRowCounterNoColumn() throws Exception {
097    String[] args = new String[] {
098      TABLE_NAME
099    };
100    runRowCount(args, 10);
101  }
102
103  /**
104   * Test a case when the column specified in command line arguments is
105   * exclusive for few rows.
106   *
107   * @throws Exception
108   */
109  @Test
110  public void testRowCounterExclusiveColumn() throws Exception {
111    String[] args = new String[] {
112      TABLE_NAME, COL_FAM + ":" + COL1
113    };
114    runRowCount(args, 8);
115  }
116
117  /**
118   * Test a case when the column specified in command line arguments is
119   * one for which the qualifier contains colons.
120   *
121   * @throws Exception
122   */
123  @Test
124  public void testRowCounterColumnWithColonInQualifier() throws Exception {
125    String[] args = new String[] {
126      TABLE_NAME, COL_FAM + ":" + COMPOSITE_COLUMN
127    };
128    runRowCount(args, 8);
129  }
130
131  /**
132   * Test a case when the column specified in command line arguments is not part
133   * of first KV for a row.
134   *
135   * @throws Exception
136   */
137  @Test
138  public void testRowCounterHiddenColumn() throws Exception {
139    String[] args = new String[] {
140      TABLE_NAME, COL_FAM + ":" + COL2
141    };
142    runRowCount(args, 10);
143  }
144
145
146  /**
147   * Test a case when the column specified in command line arguments is
148   * exclusive for few rows and also a row range filter is specified
149   *
150   * @throws Exception
151   */
152  @Test
153  public void testRowCounterColumnAndRowRange() throws Exception {
154    String[] args = new String[] {
155      TABLE_NAME, "--range=\\x00rov,\\x00rox", COL_FAM + ":" + COL1
156    };
157    runRowCount(args, 8);
158  }
159
160  /**
161   * Test a case when a range is specified with single range of start-end keys
162   * @throws Exception
163   */
164  @Test
165  public void testRowCounterRowSingleRange() throws Exception {
166    String[] args = new String[] {
167      TABLE_NAME, "--range=\\x00row1,\\x00row3"
168    };
169    runRowCount(args, 2);
170  }
171
172  /**
173   * Test a case when a range is specified with single range with end key only
174   * @throws Exception
175   */
176  @Test
177  public void testRowCounterRowSingleRangeUpperBound() throws Exception {
178    String[] args = new String[] {
179      TABLE_NAME, "--range=,\\x00row3"
180    };
181    runRowCount(args, 3);
182  }
183
184  /**
185   * Test a case when a range is specified with two ranges where one range is with end key only
186   * @throws Exception
187   */
188  @Test
189  public void testRowCounterRowMultiRangeUpperBound() throws Exception {
190    String[] args = new String[] {
191      TABLE_NAME, "--range=,\\x00row3;\\x00row5,\\x00row7"
192    };
193    runRowCount(args, 5);
194  }
195
196  /**
197   * Test a case when a range is specified with multiple ranges of start-end keys
198   * @throws Exception
199   */
200  @Test
201  public void testRowCounterRowMultiRange() throws Exception {
202    String[] args = new String[] {
203      TABLE_NAME, "--range=\\x00row1,\\x00row3;\\x00row5,\\x00row8"
204    };
205    runRowCount(args, 5);
206  }
207
208  /**
209   * Test a case when a range is specified with multiple ranges of start-end keys;
210   * one range is filled, another two are not
211   * @throws Exception
212   */
213  @Test
214  public void testRowCounterRowMultiEmptyRange() throws Exception {
215    String[] args = new String[] {
216      TABLE_NAME, "--range=\\x00row1,\\x00row3;;"
217    };
218    runRowCount(args, 2);
219  }
220
221  @Test
222  public void testRowCounter10kRowRange() throws Exception {
223    String tableName = TABLE_NAME + "10k";
224
225    try (Table table = TEST_UTIL.createTable(
226      TableName.valueOf(tableName), Bytes.toBytes(COL_FAM))) {
227      writeRows(table, 10000, 0);
228    }
229    String[] args = new String[] {
230      tableName, "--range=\\x00row9872,\\x00row9875"
231    };
232    runRowCount(args, 3);
233  }
234
235  /**
236   * Test a case when the timerange is specified with --starttime and --endtime options
237   *
238   * @throws Exception
239   */
240  @Test
241  public void testRowCounterTimeRange() throws Exception {
242    final byte[] family = Bytes.toBytes(COL_FAM);
243    final byte[] col1 = Bytes.toBytes(COL1);
244    Put put1 = new Put(Bytes.toBytes("row_timerange_" + 1));
245    Put put2 = new Put(Bytes.toBytes("row_timerange_" + 2));
246    Put put3 = new Put(Bytes.toBytes("row_timerange_" + 3));
247
248    long ts;
249
250    // clean up content of TABLE_NAME
251    Table table = TEST_UTIL.createTable(TableName.valueOf(TABLE_NAME_TS_RANGE), Bytes.toBytes(COL_FAM));
252
253    ts = EnvironmentEdgeManager.currentTime();
254    put1.addColumn(family, col1, ts, Bytes.toBytes("val1"));
255    table.put(put1);
256    Thread.sleep(100);
257
258    ts = EnvironmentEdgeManager.currentTime();
259    put2.addColumn(family, col1, ts, Bytes.toBytes("val2"));
260    put3.addColumn(family, col1, ts, Bytes.toBytes("val3"));
261    table.put(put2);
262    table.put(put3);
263    table.close();
264
265    String[] args = new String[] {
266      TABLE_NAME_TS_RANGE, COL_FAM + ":" + COL1,
267      "--starttime=" + 0,
268      "--endtime=" + ts
269    };
270    runRowCount(args, 1);
271
272    args = new String[] {
273      TABLE_NAME_TS_RANGE, COL_FAM + ":" + COL1,
274      "--starttime=" + 0,
275      "--endtime=" + (ts - 10)
276    };
277    runRowCount(args, 1);
278
279    args = new String[] {
280      TABLE_NAME_TS_RANGE, COL_FAM + ":" + COL1,
281      "--starttime=" + ts,
282      "--endtime=" + (ts + 1000)
283    };
284    runRowCount(args, 2);
285
286    args = new String[] {
287      TABLE_NAME_TS_RANGE, COL_FAM + ":" + COL1,
288      "--starttime=" + (ts - 30 * 1000),
289      "--endtime=" + (ts + 30 * 1000),
290    };
291    runRowCount(args, 3);
292  }
293
294  /**
295   * Run the RowCounter map reduce job and verify the row count.
296   *
297   * @param args the command line arguments to be used for rowcounter job.
298   * @param expectedCount the expected row count (result of map reduce job).
299   * @throws Exception
300   */
301  private void runRowCount(String[] args, int expectedCount) throws Exception {
302    RowCounter rowCounter = new RowCounter();
303    rowCounter.setConf(TEST_UTIL.getConfiguration());
304    args = Arrays.copyOf(args, args.length+1);
305    args[args.length-1]="--expectedCount=" + expectedCount;
306    long start = EnvironmentEdgeManager.currentTime();
307    int result = rowCounter.run(args);
308    long duration = EnvironmentEdgeManager.currentTime() - start;
309    LOG.debug("row count duration (ms): " + duration);
310    assertTrue(result==0);
311  }
312
313  /**
314   * Run the RowCounter map reduce job and verify the row count.
315   *
316   * @param args the command line arguments to be used for rowcounter job.
317   * @param expectedCount the expected row count (result of map reduce job).
318   * @throws Exception in case of any unexpected error.
319   */
320  private void runCreateSubmittableJobWithArgs(String[] args, int expectedCount) throws Exception {
321    Job job = RowCounter.createSubmittableJob(TEST_UTIL.getConfiguration(), args);
322    long start = EnvironmentEdgeManager.currentTime();
323    job.waitForCompletion(true);
324    long duration = EnvironmentEdgeManager.currentTime() - start;
325    LOG.debug("row count duration (ms): " + duration);
326    assertTrue(job.isSuccessful());
327    Counter counter = job.getCounters().findCounter(RowCounter.RowCounterMapper.Counters.ROWS);
328    assertEquals(expectedCount, counter.getValue());
329  }
330
331  @Test
332  public void testCreateSubmittableJobWithArgsNoColumn() throws Exception {
333    String[] args = new String[] {
334      TABLE_NAME
335    };
336    runCreateSubmittableJobWithArgs(args, 10);
337  }
338
339  /**
340   * Test a case when the column specified in command line arguments is
341   * exclusive for few rows.
342   *
343   * @throws Exception in case of any unexpected error.
344   */
345  @Test
346  public void testCreateSubmittableJobWithArgsExclusiveColumn() throws Exception {
347    String[] args = new String[] {
348      TABLE_NAME, COL_FAM + ":" + COL1
349    };
350    runCreateSubmittableJobWithArgs(args, 8);
351  }
352
353  /**
354   * Test a case when the column specified in command line arguments is
355   * one for which the qualifier contains colons.
356   *
357   * @throws Exception in case of any unexpected error.
358   */
359  @Test
360  public void testCreateSubmittableJobWithArgsColumnWithColonInQualifier() throws Exception {
361    String[] args = new String[] {
362      TABLE_NAME, COL_FAM + ":" + COMPOSITE_COLUMN
363    };
364    runCreateSubmittableJobWithArgs(args, 8);
365  }
366
367  /**
368   * Test a case when the column specified in command line arguments is not part
369   * of first KV for a row.
370   *
371   * @throws Exception in case of any unexpected error.
372   */
373  @Test
374  public void testCreateSubmittableJobWithArgsHiddenColumn() throws Exception {
375    String[] args = new String[] {
376      TABLE_NAME, COL_FAM + ":" + COL2
377    };
378    runCreateSubmittableJobWithArgs(args, 10);
379  }
380
381
382  /**
383   * Test a case when the column specified in command line arguments is
384   * exclusive for few rows and also a row range filter is specified
385   *
386   * @throws Exception in case of any unexpected error.
387   */
388  @Test
389  public void testCreateSubmittableJobWithArgsColumnAndRowRange() throws Exception {
390    String[] args = new String[] {
391      TABLE_NAME, "--range=\\x00rov,\\x00rox", COL_FAM + ":" + COL1
392    };
393    runCreateSubmittableJobWithArgs(args, 8);
394  }
395
396  /**
397   * Test a case when a range is specified with single range of start-end keys
398   * @throws Exception in case of any unexpected error.
399   */
400  @Test
401  public void testCreateSubmittableJobWithArgsRowSingleRange() throws Exception {
402    String[] args = new String[] {
403      TABLE_NAME, "--range=\\x00row1,\\x00row3"
404    };
405    runCreateSubmittableJobWithArgs(args, 2);
406  }
407
408  /**
409   * Test a case when a range is specified with single range with end key only
410   * @throws Exception in case of any unexpected error.
411   */
412  @Test
413  public void testCreateSubmittableJobWithArgsRowSingleRangeUpperBound() throws Exception {
414    String[] args = new String[] {
415      TABLE_NAME, "--range=,\\x00row3"
416    };
417    runCreateSubmittableJobWithArgs(args, 3);
418  }
419
420  /**
421   * Test a case when a range is specified with two ranges where one range is with end key only
422   * @throws Exception in case of any unexpected error.
423   */
424  @Test
425  public void testCreateSubmittableJobWithArgsRowMultiRangeUpperBound() throws Exception {
426    String[] args = new String[] {
427      TABLE_NAME, "--range=,\\x00row3;\\x00row5,\\x00row7"
428    };
429    runCreateSubmittableJobWithArgs(args, 5);
430  }
431
432  /**
433   * Test a case when a range is specified with multiple ranges of start-end keys
434   * @throws Exception in case of any unexpected error.
435   */
436  @Test
437  public void testCreateSubmittableJobWithArgsRowMultiRange() throws Exception {
438    String[] args = new String[] {
439      TABLE_NAME, "--range=\\x00row1,\\x00row3;\\x00row5,\\x00row8"
440    };
441    runCreateSubmittableJobWithArgs(args, 5);
442  }
443
444  /**
445   * Test a case when a range is specified with multiple ranges of start-end keys;
446   * one range is filled, another two are not
447   * @throws Exception in case of any unexpected error.
448   */
449  @Test
450  public void testCreateSubmittableJobWithArgsRowMultiEmptyRange() throws Exception {
451    String[] args = new String[] {
452      TABLE_NAME, "--range=\\x00row1,\\x00row3;;"
453    };
454    runCreateSubmittableJobWithArgs(args, 2);
455  }
456
457  @Test
458  public void testCreateSubmittableJobWithArgs10kRowRange() throws Exception {
459    String tableName = TABLE_NAME + "CreateSubmittableJobWithArgs10kRowRange";
460
461    try (Table table = TEST_UTIL.createTable(
462      TableName.valueOf(tableName), Bytes.toBytes(COL_FAM))) {
463      writeRows(table, 10000, 0);
464    }
465    String[] args = new String[] {
466      tableName, "--range=\\x00row9872,\\x00row9875"
467    };
468    runCreateSubmittableJobWithArgs(args, 3);
469  }
470
471  /**
472   * Test a case when the timerange is specified with --starttime and --endtime options
473   *
474   * @throws Exception in case of any unexpected error.
475   */
476  @Test
477  public void testCreateSubmittableJobWithArgsTimeRange() throws Exception {
478    final byte[] family = Bytes.toBytes(COL_FAM);
479    final byte[] col1 = Bytes.toBytes(COL1);
480    Put put1 = new Put(Bytes.toBytes("row_timerange_" + 1));
481    Put put2 = new Put(Bytes.toBytes("row_timerange_" + 2));
482    Put put3 = new Put(Bytes.toBytes("row_timerange_" + 3));
483
484    long ts;
485
486    String tableName = TABLE_NAME_TS_RANGE+"CreateSubmittableJobWithArgs";
487    // clean up content of TABLE_NAME
488    Table table = TEST_UTIL.createTable(TableName.valueOf(tableName), Bytes.toBytes(COL_FAM));
489
490    ts = EnvironmentEdgeManager.currentTime();
491    put1.addColumn(family, col1, ts, Bytes.toBytes("val1"));
492    table.put(put1);
493    Thread.sleep(100);
494
495    ts = EnvironmentEdgeManager.currentTime();
496    put2.addColumn(family, col1, ts, Bytes.toBytes("val2"));
497    put3.addColumn(family, col1, ts, Bytes.toBytes("val3"));
498    table.put(put2);
499    table.put(put3);
500    table.close();
501
502    String[] args = new String[] {
503      tableName, COL_FAM + ":" + COL1,
504      "--starttime=" + 0,
505      "--endtime=" + ts
506    };
507    runCreateSubmittableJobWithArgs(args, 1);
508
509    args = new String[] {
510      tableName, COL_FAM + ":" + COL1,
511      "--starttime=" + 0,
512      "--endtime=" + (ts - 10)
513    };
514    runCreateSubmittableJobWithArgs(args, 1);
515
516    args = new String[] {
517      tableName, COL_FAM + ":" + COL1,
518      "--starttime=" + ts,
519      "--endtime=" + (ts + 1000)
520    };
521    runCreateSubmittableJobWithArgs(args, 2);
522
523    args = new String[] {
524      tableName, COL_FAM + ":" + COL1,
525      "--starttime=" + (ts - 30 * 1000),
526      "--endtime=" + (ts + 30 * 1000),
527    };
528    runCreateSubmittableJobWithArgs(args, 3);
529  }
530
531  /**
532   * Writes TOTAL_ROWS number of distinct rows in to the table. Few rows have
533   * two columns, Few have one.
534   *
535   * @param table
536   * @throws IOException
537   */
538  private static void writeRows(Table table, int totalRows, int rowsWithOneCol) throws IOException {
539    final byte[] family = Bytes.toBytes(COL_FAM);
540    final byte[] value = Bytes.toBytes("abcd");
541    final byte[] col1 = Bytes.toBytes(COL1);
542    final byte[] col2 = Bytes.toBytes(COL2);
543    final byte[] col3 = Bytes.toBytes(COMPOSITE_COLUMN);
544    ArrayList<Put> rowsUpdate = new ArrayList<>();
545    // write few rows with two columns
546    int i = 0;
547    for (; i < totalRows - rowsWithOneCol; i++) {
548      // Use binary rows values to test for HBASE-15287.
549      byte[] row = Bytes.toBytesBinary("\\x00row" + i);
550      Put put = new Put(row);
551      put.addColumn(family, col1, value);
552      put.addColumn(family, col2, value);
553      put.addColumn(family, col3, value);
554      rowsUpdate.add(put);
555    }
556
557    // write few rows with only one column
558    for (; i < totalRows; i++) {
559      byte[] row = Bytes.toBytes("row" + i);
560      Put put = new Put(row);
561      put.addColumn(family, col2, value);
562      rowsUpdate.add(put);
563    }
564    table.put(rowsUpdate);
565  }
566
567  /**
568   * test main method. Import should print help and call System.exit
569   */
570  @Test
571  public void testImportMain() throws Exception {
572    SecurityManager SECURITY_MANAGER = System.getSecurityManager();
573    LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
574    System.setSecurityManager(newSecurityManager);
575    String[] args = {};
576    try {
577      try {
578        RowCounter.main(args);
579        fail("should be SecurityException");
580      } catch (SecurityException e) {
581        assertEquals(RowCounter.EXIT_FAILURE, newSecurityManager.getExitCode());
582      }
583      try {
584        args = new String[2];
585        args[0] = "table";
586        args[1] = "--range=1";
587        RowCounter.main(args);
588        fail("should be SecurityException");
589      } catch (SecurityException e) {
590        assertEquals(RowCounter.EXIT_FAILURE, newSecurityManager.getExitCode());
591      }
592
593    } finally {
594      System.setSecurityManager(SECURITY_MANAGER);
595    }
596  }
597
598  @Test
599  public void testHelp() throws Exception {
600    PrintStream oldPrintStream = System.out;
601    try {
602      ByteArrayOutputStream data = new ByteArrayOutputStream();
603      PrintStream stream = new PrintStream(data);
604      System.setOut(stream);
605      String[] args = {"-h"};
606      runRowCount(args, 0);
607      assertUsageContent(data.toString());
608      args = new String[]{"--help"};
609      runRowCount(args, 0);
610      assertUsageContent(data.toString());
611    }finally {
612      System.setOut(oldPrintStream);
613    }
614  }
615
616  @Test
617  public void testInvalidTable() throws Exception {
618    try {
619      String[] args = {"invalid"};
620      runRowCount(args, 0);
621      fail("RowCounter should had failed with invalid table.");
622    }catch (Throwable e){
623      assertTrue(e instanceof AssertionError);
624    }
625  }
626
627  private void assertUsageContent(String usage) {
628    assertTrue(usage.contains("usage: hbase rowcounter "
629      + "<tablename> [options] [<column1> <column2>...]"));
630    assertTrue(usage.contains("Options:\n"));
631    assertTrue(usage.contains("--starttime=<arg>       "
632      + "starting time filter to start counting rows from.\n"));
633    assertTrue(usage.contains("--endtime=<arg>         "
634      + "end time filter limit, to only count rows up to this timestamp.\n"));
635    assertTrue(usage.contains("--range=<arg>           "
636      + "[startKey],[endKey][;[startKey],[endKey]...]]\n"));
637    assertTrue(usage.contains("--expectedCount=<arg>   expected number of rows to be count.\n"));
638    assertTrue(usage.contains("For performance, "
639      + "consider the following configuration properties:\n"));
640    assertTrue(usage.contains("-Dhbase.client.scanner.caching=100\n"));
641    assertTrue(usage.contains("-Dmapreduce.map.speculative=false\n"));
642  }
643
644}