001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import java.io.ByteArrayOutputStream;
025import java.io.IOException;
026import java.io.PrintStream;
027import java.util.ArrayList;
028import java.util.Arrays;
029
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseTestingUtility;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.Put;
034import org.apache.hadoop.hbase.client.Table;
035import org.apache.hadoop.hbase.testclassification.LargeTests;
036import org.apache.hadoop.hbase.testclassification.MapReduceTests;
037import org.apache.hadoop.hbase.util.Bytes;
038import org.apache.hadoop.hbase.util.LauncherSecurityManager;
039import org.apache.hadoop.mapreduce.Counter;
040import org.apache.hadoop.mapreduce.Job;
041import org.junit.AfterClass;
042import org.junit.BeforeClass;
043import org.junit.ClassRule;
044import org.junit.Test;
045import org.junit.experimental.categories.Category;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/**
050 * Test the rowcounter map reduce job.
051 */
052@Category({MapReduceTests.class, LargeTests.class})
053public class TestRowCounter {
054
055  @ClassRule
056  public static final HBaseClassTestRule CLASS_RULE =
057    HBaseClassTestRule.forClass(TestRowCounter.class);
058
059  private static final Logger LOG = LoggerFactory.getLogger(TestRowCounter.class);
060  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
061  private final static String TABLE_NAME = "testRowCounter";
062  private final static String TABLE_NAME_TS_RANGE = "testRowCounter_ts_range";
063  private final static String COL_FAM = "col_fam";
064  private final static String COL1 = "c1";
065  private final static String COL2 = "c2";
066  private final static String COMPOSITE_COLUMN = "C:A:A";
067  private final static int TOTAL_ROWS = 10;
068  private final static int ROWS_WITH_ONE_COL = 2;
069
070  /**
071   * @throws java.lang.Exception
072   */
073  @BeforeClass
074  public static void setUpBeforeClass() throws Exception {
075    TEST_UTIL.startMiniCluster();
076    Table table = TEST_UTIL.createTable(TableName.valueOf(TABLE_NAME), Bytes.toBytes(COL_FAM));
077    writeRows(table, TOTAL_ROWS, ROWS_WITH_ONE_COL);
078    table.close();
079  }
080
081  /**
082   * @throws java.lang.Exception
083   */
084  @AfterClass
085  public static void tearDownAfterClass() throws Exception {
086    TEST_UTIL.shutdownMiniCluster();
087  }
088
089  /**
090   * Test a case when no column was specified in command line arguments.
091   *
092   * @throws Exception
093   */
094  @Test
095  public void testRowCounterNoColumn() throws Exception {
096    String[] args = new String[] {
097      TABLE_NAME
098    };
099    runRowCount(args, 10);
100  }
101
102  /**
103   * Test a case when the column specified in command line arguments is
104   * exclusive for few rows.
105   *
106   * @throws Exception
107   */
108  @Test
109  public void testRowCounterExclusiveColumn() throws Exception {
110    String[] args = new String[] {
111      TABLE_NAME, COL_FAM + ":" + COL1
112    };
113    runRowCount(args, 8);
114  }
115
116  /**
117   * Test a case when the column specified in command line arguments is
118   * one for which the qualifier contains colons.
119   *
120   * @throws Exception
121   */
122  @Test
123  public void testRowCounterColumnWithColonInQualifier() throws Exception {
124    String[] args = new String[] {
125      TABLE_NAME, COL_FAM + ":" + COMPOSITE_COLUMN
126    };
127    runRowCount(args, 8);
128  }
129
130  /**
131   * Test a case when the column specified in command line arguments is not part
132   * of first KV for a row.
133   *
134   * @throws Exception
135   */
136  @Test
137  public void testRowCounterHiddenColumn() throws Exception {
138    String[] args = new String[] {
139      TABLE_NAME, COL_FAM + ":" + COL2
140    };
141    runRowCount(args, 10);
142  }
143
144
145  /**
146   * Test a case when the column specified in command line arguments is
147   * exclusive for few rows and also a row range filter is specified
148   *
149   * @throws Exception
150   */
151  @Test
152  public void testRowCounterColumnAndRowRange() throws Exception {
153    String[] args = new String[] {
154      TABLE_NAME, "--range=\\x00rov,\\x00rox", COL_FAM + ":" + COL1
155    };
156    runRowCount(args, 8);
157  }
158
159  /**
160   * Test a case when a range is specified with single range of start-end keys
161   * @throws Exception
162   */
163  @Test
164  public void testRowCounterRowSingleRange() throws Exception {
165    String[] args = new String[] {
166      TABLE_NAME, "--range=\\x00row1,\\x00row3"
167    };
168    runRowCount(args, 2);
169  }
170
171  /**
172   * Test a case when a range is specified with single range with end key only
173   * @throws Exception
174   */
175  @Test
176  public void testRowCounterRowSingleRangeUpperBound() throws Exception {
177    String[] args = new String[] {
178      TABLE_NAME, "--range=,\\x00row3"
179    };
180    runRowCount(args, 3);
181  }
182
183  /**
184   * Test a case when a range is specified with two ranges where one range is with end key only
185   * @throws Exception
186   */
187  @Test
188  public void testRowCounterRowMultiRangeUpperBound() throws Exception {
189    String[] args = new String[] {
190      TABLE_NAME, "--range=,\\x00row3;\\x00row5,\\x00row7"
191    };
192    runRowCount(args, 5);
193  }
194
195  /**
196   * Test a case when a range is specified with multiple ranges of start-end keys
197   * @throws Exception
198   */
199  @Test
200  public void testRowCounterRowMultiRange() throws Exception {
201    String[] args = new String[] {
202      TABLE_NAME, "--range=\\x00row1,\\x00row3;\\x00row5,\\x00row8"
203    };
204    runRowCount(args, 5);
205  }
206
207  /**
208   * Test a case when a range is specified with multiple ranges of start-end keys;
209   * one range is filled, another two are not
210   * @throws Exception
211   */
212  @Test
213  public void testRowCounterRowMultiEmptyRange() throws Exception {
214    String[] args = new String[] {
215      TABLE_NAME, "--range=\\x00row1,\\x00row3;;"
216    };
217    runRowCount(args, 2);
218  }
219
220  @Test
221  public void testRowCounter10kRowRange() throws Exception {
222    String tableName = TABLE_NAME + "10k";
223
224    try (Table table = TEST_UTIL.createTable(
225      TableName.valueOf(tableName), Bytes.toBytes(COL_FAM))) {
226      writeRows(table, 10000, 0);
227    }
228    String[] args = new String[] {
229      tableName, "--range=\\x00row9872,\\x00row9875"
230    };
231    runRowCount(args, 3);
232  }
233
234  /**
235   * Test a case when the timerange is specified with --starttime and --endtime options
236   *
237   * @throws Exception
238   */
239  @Test
240  public void testRowCounterTimeRange() throws Exception {
241    final byte[] family = Bytes.toBytes(COL_FAM);
242    final byte[] col1 = Bytes.toBytes(COL1);
243    Put put1 = new Put(Bytes.toBytes("row_timerange_" + 1));
244    Put put2 = new Put(Bytes.toBytes("row_timerange_" + 2));
245    Put put3 = new Put(Bytes.toBytes("row_timerange_" + 3));
246
247    long ts;
248
249    // clean up content of TABLE_NAME
250    Table table = TEST_UTIL.createTable(TableName.valueOf(TABLE_NAME_TS_RANGE), Bytes.toBytes(COL_FAM));
251
252    ts = System.currentTimeMillis();
253    put1.addColumn(family, col1, ts, Bytes.toBytes("val1"));
254    table.put(put1);
255    Thread.sleep(100);
256
257    ts = System.currentTimeMillis();
258    put2.addColumn(family, col1, ts, Bytes.toBytes("val2"));
259    put3.addColumn(family, col1, ts, Bytes.toBytes("val3"));
260    table.put(put2);
261    table.put(put3);
262    table.close();
263
264    String[] args = new String[] {
265      TABLE_NAME_TS_RANGE, COL_FAM + ":" + COL1,
266      "--starttime=" + 0,
267      "--endtime=" + ts
268    };
269    runRowCount(args, 1);
270
271    args = new String[] {
272      TABLE_NAME_TS_RANGE, COL_FAM + ":" + COL1,
273      "--starttime=" + 0,
274      "--endtime=" + (ts - 10)
275    };
276    runRowCount(args, 1);
277
278    args = new String[] {
279      TABLE_NAME_TS_RANGE, COL_FAM + ":" + COL1,
280      "--starttime=" + ts,
281      "--endtime=" + (ts + 1000)
282    };
283    runRowCount(args, 2);
284
285    args = new String[] {
286      TABLE_NAME_TS_RANGE, COL_FAM + ":" + COL1,
287      "--starttime=" + (ts - 30 * 1000),
288      "--endtime=" + (ts + 30 * 1000),
289    };
290    runRowCount(args, 3);
291  }
292
293  /**
294   * Run the RowCounter map reduce job and verify the row count.
295   *
296   * @param args the command line arguments to be used for rowcounter job.
297   * @param expectedCount the expected row count (result of map reduce job).
298   * @throws Exception
299   */
300  private void runRowCount(String[] args, int expectedCount) throws Exception {
301    RowCounter rowCounter = new RowCounter();
302    rowCounter.setConf(TEST_UTIL.getConfiguration());
303    args = Arrays.copyOf(args, args.length+1);
304    args[args.length-1]="--expectedCount=" + expectedCount;
305    long start = System.currentTimeMillis();
306    int result = rowCounter.run(args);
307    long duration = System.currentTimeMillis() - start;
308    LOG.debug("row count duration (ms): " + duration);
309    assertTrue(result==0);
310  }
311
312  /**
313   * Run the RowCounter map reduce job and verify the row count.
314   *
315   * @param args the command line arguments to be used for rowcounter job.
316   * @param expectedCount the expected row count (result of map reduce job).
317   * @throws Exception in case of any unexpected error.
318   */
319  private void runCreateSubmittableJobWithArgs(String[] args, int expectedCount) throws Exception {
320    Job job = RowCounter.createSubmittableJob(TEST_UTIL.getConfiguration(), args);
321    long start = System.currentTimeMillis();
322    job.waitForCompletion(true);
323    long duration = System.currentTimeMillis() - start;
324    LOG.debug("row count duration (ms): " + duration);
325    assertTrue(job.isSuccessful());
326    Counter counter = job.getCounters().findCounter(RowCounter.RowCounterMapper.Counters.ROWS);
327    assertEquals(expectedCount, counter.getValue());
328  }
329
330  @Test
331  public void testCreateSubmittableJobWithArgsNoColumn() throws Exception {
332    String[] args = new String[] {
333      TABLE_NAME
334    };
335    runCreateSubmittableJobWithArgs(args, 10);
336  }
337
338  /**
339   * Test a case when the column specified in command line arguments is
340   * exclusive for few rows.
341   *
342   * @throws Exception in case of any unexpected error.
343   */
344  @Test
345  public void testCreateSubmittableJobWithArgsExclusiveColumn() throws Exception {
346    String[] args = new String[] {
347      TABLE_NAME, COL_FAM + ":" + COL1
348    };
349    runCreateSubmittableJobWithArgs(args, 8);
350  }
351
352  /**
353   * Test a case when the column specified in command line arguments is
354   * one for which the qualifier contains colons.
355   *
356   * @throws Exception in case of any unexpected error.
357   */
358  @Test
359  public void testCreateSubmittableJobWithArgsColumnWithColonInQualifier() throws Exception {
360    String[] args = new String[] {
361      TABLE_NAME, COL_FAM + ":" + COMPOSITE_COLUMN
362    };
363    runCreateSubmittableJobWithArgs(args, 8);
364  }
365
366  /**
367   * Test a case when the column specified in command line arguments is not part
368   * of first KV for a row.
369   *
370   * @throws Exception in case of any unexpected error.
371   */
372  @Test
373  public void testCreateSubmittableJobWithArgsHiddenColumn() throws Exception {
374    String[] args = new String[] {
375      TABLE_NAME, COL_FAM + ":" + COL2
376    };
377    runCreateSubmittableJobWithArgs(args, 10);
378  }
379
380
381  /**
382   * Test a case when the column specified in command line arguments is
383   * exclusive for few rows and also a row range filter is specified
384   *
385   * @throws Exception in case of any unexpected error.
386   */
387  @Test
388  public void testCreateSubmittableJobWithArgsColumnAndRowRange() throws Exception {
389    String[] args = new String[] {
390      TABLE_NAME, "--range=\\x00rov,\\x00rox", COL_FAM + ":" + COL1
391    };
392    runCreateSubmittableJobWithArgs(args, 8);
393  }
394
395  /**
396   * Test a case when a range is specified with single range of start-end keys
397   * @throws Exception in case of any unexpected error.
398   */
399  @Test
400  public void testCreateSubmittableJobWithArgsRowSingleRange() throws Exception {
401    String[] args = new String[] {
402      TABLE_NAME, "--range=\\x00row1,\\x00row3"
403    };
404    runCreateSubmittableJobWithArgs(args, 2);
405  }
406
407  /**
408   * Test a case when a range is specified with single range with end key only
409   * @throws Exception in case of any unexpected error.
410   */
411  @Test
412  public void testCreateSubmittableJobWithArgsRowSingleRangeUpperBound() throws Exception {
413    String[] args = new String[] {
414      TABLE_NAME, "--range=,\\x00row3"
415    };
416    runCreateSubmittableJobWithArgs(args, 3);
417  }
418
419  /**
420   * Test a case when a range is specified with two ranges where one range is with end key only
421   * @throws Exception in case of any unexpected error.
422   */
423  @Test
424  public void testCreateSubmittableJobWithArgsRowMultiRangeUpperBound() throws Exception {
425    String[] args = new String[] {
426      TABLE_NAME, "--range=,\\x00row3;\\x00row5,\\x00row7"
427    };
428    runCreateSubmittableJobWithArgs(args, 5);
429  }
430
431  /**
432   * Test a case when a range is specified with multiple ranges of start-end keys
433   * @throws Exception in case of any unexpected error.
434   */
435  @Test
436  public void testCreateSubmittableJobWithArgsRowMultiRange() throws Exception {
437    String[] args = new String[] {
438      TABLE_NAME, "--range=\\x00row1,\\x00row3;\\x00row5,\\x00row8"
439    };
440    runCreateSubmittableJobWithArgs(args, 5);
441  }
442
443  /**
444   * Test a case when a range is specified with multiple ranges of start-end keys;
445   * one range is filled, another two are not
446   * @throws Exception in case of any unexpected error.
447   */
448  @Test
449  public void testCreateSubmittableJobWithArgsRowMultiEmptyRange() throws Exception {
450    String[] args = new String[] {
451      TABLE_NAME, "--range=\\x00row1,\\x00row3;;"
452    };
453    runCreateSubmittableJobWithArgs(args, 2);
454  }
455
456  @Test
457  public void testCreateSubmittableJobWithArgs10kRowRange() throws Exception {
458    String tableName = TABLE_NAME + "CreateSubmittableJobWithArgs10kRowRange";
459
460    try (Table table = TEST_UTIL.createTable(
461      TableName.valueOf(tableName), Bytes.toBytes(COL_FAM))) {
462      writeRows(table, 10000, 0);
463    }
464    String[] args = new String[] {
465      tableName, "--range=\\x00row9872,\\x00row9875"
466    };
467    runCreateSubmittableJobWithArgs(args, 3);
468  }
469
470  /**
471   * Test a case when the timerange is specified with --starttime and --endtime options
472   *
473   * @throws Exception in case of any unexpected error.
474   */
475  @Test
476  public void testCreateSubmittableJobWithArgsTimeRange() throws Exception {
477    final byte[] family = Bytes.toBytes(COL_FAM);
478    final byte[] col1 = Bytes.toBytes(COL1);
479    Put put1 = new Put(Bytes.toBytes("row_timerange_" + 1));
480    Put put2 = new Put(Bytes.toBytes("row_timerange_" + 2));
481    Put put3 = new Put(Bytes.toBytes("row_timerange_" + 3));
482
483    long ts;
484
485    String tableName = TABLE_NAME_TS_RANGE+"CreateSubmittableJobWithArgs";
486    // clean up content of TABLE_NAME
487    Table table = TEST_UTIL.createTable(TableName.valueOf(tableName), Bytes.toBytes(COL_FAM));
488
489    ts = System.currentTimeMillis();
490    put1.addColumn(family, col1, ts, Bytes.toBytes("val1"));
491    table.put(put1);
492    Thread.sleep(100);
493
494    ts = System.currentTimeMillis();
495    put2.addColumn(family, col1, ts, Bytes.toBytes("val2"));
496    put3.addColumn(family, col1, ts, Bytes.toBytes("val3"));
497    table.put(put2);
498    table.put(put3);
499    table.close();
500
501    String[] args = new String[] {
502      tableName, COL_FAM + ":" + COL1,
503      "--starttime=" + 0,
504      "--endtime=" + ts
505    };
506    runCreateSubmittableJobWithArgs(args, 1);
507
508    args = new String[] {
509      tableName, COL_FAM + ":" + COL1,
510      "--starttime=" + 0,
511      "--endtime=" + (ts - 10)
512    };
513    runCreateSubmittableJobWithArgs(args, 1);
514
515    args = new String[] {
516      tableName, COL_FAM + ":" + COL1,
517      "--starttime=" + ts,
518      "--endtime=" + (ts + 1000)
519    };
520    runCreateSubmittableJobWithArgs(args, 2);
521
522    args = new String[] {
523      tableName, COL_FAM + ":" + COL1,
524      "--starttime=" + (ts - 30 * 1000),
525      "--endtime=" + (ts + 30 * 1000),
526    };
527    runCreateSubmittableJobWithArgs(args, 3);
528  }
529
530  /**
531   * Writes TOTAL_ROWS number of distinct rows in to the table. Few rows have
532   * two columns, Few have one.
533   *
534   * @param table
535   * @throws IOException
536   */
537  private static void writeRows(Table table, int totalRows, int rowsWithOneCol) throws IOException {
538    final byte[] family = Bytes.toBytes(COL_FAM);
539    final byte[] value = Bytes.toBytes("abcd");
540    final byte[] col1 = Bytes.toBytes(COL1);
541    final byte[] col2 = Bytes.toBytes(COL2);
542    final byte[] col3 = Bytes.toBytes(COMPOSITE_COLUMN);
543    ArrayList<Put> rowsUpdate = new ArrayList<>();
544    // write few rows with two columns
545    int i = 0;
546    for (; i < totalRows - rowsWithOneCol; i++) {
547      // Use binary rows values to test for HBASE-15287.
548      byte[] row = Bytes.toBytesBinary("\\x00row" + i);
549      Put put = new Put(row);
550      put.addColumn(family, col1, value);
551      put.addColumn(family, col2, value);
552      put.addColumn(family, col3, value);
553      rowsUpdate.add(put);
554    }
555
556    // write few rows with only one column
557    for (; i < totalRows; i++) {
558      byte[] row = Bytes.toBytes("row" + i);
559      Put put = new Put(row);
560      put.addColumn(family, col2, value);
561      rowsUpdate.add(put);
562    }
563    table.put(rowsUpdate);
564  }
565
566  /**
567   * test main method. Import should print help and call System.exit
568   */
569  @Test
570  public void testImportMain() throws Exception {
571    SecurityManager SECURITY_MANAGER = System.getSecurityManager();
572    LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
573    System.setSecurityManager(newSecurityManager);
574    String[] args = {};
575    try {
576      try {
577        RowCounter.main(args);
578        fail("should be SecurityException");
579      } catch (SecurityException e) {
580        assertEquals(RowCounter.EXIT_FAILURE, newSecurityManager.getExitCode());
581      }
582      try {
583        args = new String[2];
584        args[0] = "table";
585        args[1] = "--range=1";
586        RowCounter.main(args);
587        fail("should be SecurityException");
588      } catch (SecurityException e) {
589        assertEquals(RowCounter.EXIT_FAILURE, newSecurityManager.getExitCode());
590      }
591
592    } finally {
593      System.setSecurityManager(SECURITY_MANAGER);
594    }
595  }
596
597  @Test
598  public void testHelp() throws Exception {
599    PrintStream oldPrintStream = System.out;
600    try {
601      ByteArrayOutputStream data = new ByteArrayOutputStream();
602      PrintStream stream = new PrintStream(data);
603      System.setOut(stream);
604      String[] args = {"-h"};
605      runRowCount(args, 0);
606      assertUsageContent(data.toString());
607      args = new String[]{"--help"};
608      runRowCount(args, 0);
609      assertUsageContent(data.toString());
610    }finally {
611      System.setOut(oldPrintStream);
612    }
613  }
614
615  @Test
616  public void testInvalidTable() throws Exception {
617    try {
618      String[] args = {"invalid"};
619      runRowCount(args, 0);
620      fail("RowCounter should had failed with invalid table.");
621    }catch (Throwable e){
622      assertTrue(e instanceof AssertionError);
623    }
624  }
625
626  private void assertUsageContent(String usage) {
627    assertTrue(usage.contains("usage: hbase rowcounter "
628      + "<tablename> [options] [<column1> <column2>...]"));
629    assertTrue(usage.contains("Options:\n"));
630    assertTrue(usage.contains("--starttime=<arg>       "
631      + "starting time filter to start counting rows from.\n"));
632    assertTrue(usage.contains("--endtime=<arg>         "
633      + "end time filter limit, to only count rows up to this timestamp.\n"));
634    assertTrue(usage.contains("--range=<arg>           "
635      + "[startKey],[endKey][;[startKey],[endKey]...]]\n"));
636    assertTrue(usage.contains("--expectedCount=<arg>   expected number of rows to be count.\n"));
637    assertTrue(usage.contains("For performance, "
638      + "consider the following configuration properties:\n"));
639    assertTrue(usage.contains("-Dhbase.client.scanner.caching=100\n"));
640    assertTrue(usage.contains("-Dmapreduce.map.speculative=false\n"));
641  }
642
643}