001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import java.io.ByteArrayOutputStream;
025import java.io.IOException;
026import java.io.PrintStream;
027import java.util.ArrayList;
028import java.util.Arrays;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtility;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.Put;
033import org.apache.hadoop.hbase.client.Table;
034import org.apache.hadoop.hbase.testclassification.LargeTests;
035import org.apache.hadoop.hbase.testclassification.MapReduceTests;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.apache.hadoop.hbase.util.LauncherSecurityManager;
039import org.apache.hadoop.mapreduce.Counter;
040import org.apache.hadoop.mapreduce.Job;
041import org.junit.AfterClass;
042import org.junit.BeforeClass;
043import org.junit.ClassRule;
044import org.junit.Test;
045import org.junit.experimental.categories.Category;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/**
050 * Test the rowcounter map reduce job.
051 */
052@Category({ MapReduceTests.class, LargeTests.class })
053public class TestRowCounter {
054
055  @ClassRule
056  public static final HBaseClassTestRule CLASS_RULE =
057    HBaseClassTestRule.forClass(TestRowCounter.class);
058
059  private static final Logger LOG = LoggerFactory.getLogger(TestRowCounter.class);
060  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
061  private final static String TABLE_NAME = "testRowCounter";
062  private final static String TABLE_NAME_TS_RANGE = "testRowCounter_ts_range";
063  private final static String COL_FAM = "col_fam";
064  private final static String COL1 = "c1";
065  private final static String COL2 = "c2";
066  private final static String COMPOSITE_COLUMN = "C:A:A";
067  private final static int TOTAL_ROWS = 10;
068  private final static int ROWS_WITH_ONE_COL = 2;
069
070  /**
071   * @throws java.lang.Exception
072   */
073  @BeforeClass
074  public static void setUpBeforeClass() throws Exception {
075    TEST_UTIL.startMiniCluster();
076    Table table = TEST_UTIL.createTable(TableName.valueOf(TABLE_NAME), Bytes.toBytes(COL_FAM));
077    writeRows(table, TOTAL_ROWS, ROWS_WITH_ONE_COL);
078    table.close();
079  }
080
081  /**
082   * @throws java.lang.Exception
083   */
084  @AfterClass
085  public static void tearDownAfterClass() throws Exception {
086    TEST_UTIL.shutdownMiniCluster();
087  }
088
089  /**
090   * Test a case when no column was specified in command line arguments. n
091   */
092  @Test
093  public void testRowCounterNoColumn() throws Exception {
094    String[] args = new String[] { TABLE_NAME };
095    runRowCount(args, 10);
096  }
097
098  /**
099   * Test a case when the column specified in command line arguments is exclusive for few rows. n
100   */
101  @Test
102  public void testRowCounterExclusiveColumn() throws Exception {
103    String[] args = new String[] { TABLE_NAME, COL_FAM + ":" + COL1 };
104    runRowCount(args, 8);
105  }
106
107  /**
108   * Test a case when the column specified in command line arguments is one for which the qualifier
109   * contains colons. n
110   */
111  @Test
112  public void testRowCounterColumnWithColonInQualifier() throws Exception {
113    String[] args = new String[] { TABLE_NAME, COL_FAM + ":" + COMPOSITE_COLUMN };
114    runRowCount(args, 8);
115  }
116
117  /**
118   * Test a case when the column specified in command line arguments is not part of first KV for a
119   * row. n
120   */
121  @Test
122  public void testRowCounterHiddenColumn() throws Exception {
123    String[] args = new String[] { TABLE_NAME, COL_FAM + ":" + COL2 };
124    runRowCount(args, 10);
125  }
126
127  /**
128   * Test a case when the column specified in command line arguments is exclusive for few rows and
129   * also a row range filter is specified n
130   */
131  @Test
132  public void testRowCounterColumnAndRowRange() throws Exception {
133    String[] args = new String[] { TABLE_NAME, "--range=\\x00rov,\\x00rox", COL_FAM + ":" + COL1 };
134    runRowCount(args, 8);
135  }
136
137  /**
138   * Test a case when a range is specified with single range of start-end keys n
139   */
140  @Test
141  public void testRowCounterRowSingleRange() throws Exception {
142    String[] args = new String[] { TABLE_NAME, "--range=\\x00row1,\\x00row3" };
143    runRowCount(args, 2);
144  }
145
146  /**
147   * Test a case when a range is specified with single range with end key only n
148   */
149  @Test
150  public void testRowCounterRowSingleRangeUpperBound() throws Exception {
151    String[] args = new String[] { TABLE_NAME, "--range=,\\x00row3" };
152    runRowCount(args, 3);
153  }
154
155  /**
156   * Test a case when a range is specified with two ranges where one range is with end key only n
157   */
158  @Test
159  public void testRowCounterRowMultiRangeUpperBound() throws Exception {
160    String[] args = new String[] { TABLE_NAME, "--range=,\\x00row3;\\x00row5,\\x00row7" };
161    runRowCount(args, 5);
162  }
163
164  /**
165   * Test a case when a range is specified with multiple ranges of start-end keys n
166   */
167  @Test
168  public void testRowCounterRowMultiRange() throws Exception {
169    String[] args = new String[] { TABLE_NAME, "--range=\\x00row1,\\x00row3;\\x00row5,\\x00row8" };
170    runRowCount(args, 5);
171  }
172
173  /**
174   * Test a case when a range is specified with multiple ranges of start-end keys; one range is
175   * filled, another two are not n
176   */
177  @Test
178  public void testRowCounterRowMultiEmptyRange() throws Exception {
179    String[] args = new String[] { TABLE_NAME, "--range=\\x00row1,\\x00row3;;" };
180    runRowCount(args, 2);
181  }
182
183  @Test
184  public void testRowCounter10kRowRange() throws Exception {
185    String tableName = TABLE_NAME + "10k";
186
187    try (
188      Table table = TEST_UTIL.createTable(TableName.valueOf(tableName), Bytes.toBytes(COL_FAM))) {
189      writeRows(table, 10000, 0);
190    }
191    String[] args = new String[] { tableName, "--range=\\x00row9872,\\x00row9875" };
192    runRowCount(args, 3);
193  }
194
195  /**
196   * Test a case when the timerange is specified with --starttime and --endtime options n
197   */
198  @Test
199  public void testRowCounterTimeRange() throws Exception {
200    final byte[] family = Bytes.toBytes(COL_FAM);
201    final byte[] col1 = Bytes.toBytes(COL1);
202    Put put1 = new Put(Bytes.toBytes("row_timerange_" + 1));
203    Put put2 = new Put(Bytes.toBytes("row_timerange_" + 2));
204    Put put3 = new Put(Bytes.toBytes("row_timerange_" + 3));
205
206    long ts;
207
208    // clean up content of TABLE_NAME
209    Table table =
210      TEST_UTIL.createTable(TableName.valueOf(TABLE_NAME_TS_RANGE), Bytes.toBytes(COL_FAM));
211
212    ts = EnvironmentEdgeManager.currentTime();
213    put1.addColumn(family, col1, ts, Bytes.toBytes("val1"));
214    table.put(put1);
215    Thread.sleep(100);
216
217    ts = EnvironmentEdgeManager.currentTime();
218    put2.addColumn(family, col1, ts, Bytes.toBytes("val2"));
219    put3.addColumn(family, col1, ts, Bytes.toBytes("val3"));
220    table.put(put2);
221    table.put(put3);
222    table.close();
223
224    String[] args = new String[] { TABLE_NAME_TS_RANGE, COL_FAM + ":" + COL1, "--starttime=" + 0,
225      "--endtime=" + ts };
226    runRowCount(args, 1);
227
228    args = new String[] { TABLE_NAME_TS_RANGE, COL_FAM + ":" + COL1, "--starttime=" + 0,
229      "--endtime=" + (ts - 10) };
230    runRowCount(args, 1);
231
232    args = new String[] { TABLE_NAME_TS_RANGE, COL_FAM + ":" + COL1, "--starttime=" + ts,
233      "--endtime=" + (ts + 1000) };
234    runRowCount(args, 2);
235
236    args = new String[] { TABLE_NAME_TS_RANGE, COL_FAM + ":" + COL1,
237      "--starttime=" + (ts - 30 * 1000), "--endtime=" + (ts + 30 * 1000), };
238    runRowCount(args, 3);
239  }
240
241  /**
242   * Run the RowCounter map reduce job and verify the row count.
243   * @param args          the command line arguments to be used for rowcounter job.
244   * @param expectedCount the expected row count (result of map reduce job). n
245   */
246  private void runRowCount(String[] args, int expectedCount) throws Exception {
247    RowCounter rowCounter = new RowCounter();
248    rowCounter.setConf(TEST_UTIL.getConfiguration());
249    args = Arrays.copyOf(args, args.length + 1);
250    args[args.length - 1] = "--expectedCount=" + expectedCount;
251    long start = EnvironmentEdgeManager.currentTime();
252    int result = rowCounter.run(args);
253    long duration = EnvironmentEdgeManager.currentTime() - start;
254    LOG.debug("row count duration (ms): " + duration);
255    assertTrue(result == 0);
256  }
257
258  /**
259   * Run the RowCounter map reduce job and verify the row count.
260   * @param args          the command line arguments to be used for rowcounter job.
261   * @param expectedCount the expected row count (result of map reduce job).
262   * @throws Exception in case of any unexpected error.
263   */
264  private void runCreateSubmittableJobWithArgs(String[] args, int expectedCount) throws Exception {
265    Job job = RowCounter.createSubmittableJob(TEST_UTIL.getConfiguration(), args);
266    long start = EnvironmentEdgeManager.currentTime();
267    job.waitForCompletion(true);
268    long duration = EnvironmentEdgeManager.currentTime() - start;
269    LOG.debug("row count duration (ms): " + duration);
270    assertTrue(job.isSuccessful());
271    Counter counter = job.getCounters().findCounter(RowCounter.RowCounterMapper.Counters.ROWS);
272    assertEquals(expectedCount, counter.getValue());
273  }
274
275  @Test
276  public void testCreateSubmittableJobWithArgsNoColumn() throws Exception {
277    String[] args = new String[] { TABLE_NAME };
278    runCreateSubmittableJobWithArgs(args, 10);
279  }
280
281  /**
282   * Test a case when the column specified in command line arguments is exclusive for few rows.
283   * @throws Exception in case of any unexpected error.
284   */
285  @Test
286  public void testCreateSubmittableJobWithArgsExclusiveColumn() throws Exception {
287    String[] args = new String[] { TABLE_NAME, COL_FAM + ":" + COL1 };
288    runCreateSubmittableJobWithArgs(args, 8);
289  }
290
291  /**
292   * Test a case when the column specified in command line arguments is one for which the qualifier
293   * contains colons.
294   * @throws Exception in case of any unexpected error.
295   */
296  @Test
297  public void testCreateSubmittableJobWithArgsColumnWithColonInQualifier() throws Exception {
298    String[] args = new String[] { TABLE_NAME, COL_FAM + ":" + COMPOSITE_COLUMN };
299    runCreateSubmittableJobWithArgs(args, 8);
300  }
301
302  /**
303   * Test a case when the column specified in command line arguments is not part of first KV for a
304   * row.
305   * @throws Exception in case of any unexpected error.
306   */
307  @Test
308  public void testCreateSubmittableJobWithArgsHiddenColumn() throws Exception {
309    String[] args = new String[] { TABLE_NAME, COL_FAM + ":" + COL2 };
310    runCreateSubmittableJobWithArgs(args, 10);
311  }
312
313  /**
314   * Test a case when the column specified in command line arguments is exclusive for few rows and
315   * also a row range filter is specified
316   * @throws Exception in case of any unexpected error.
317   */
318  @Test
319  public void testCreateSubmittableJobWithArgsColumnAndRowRange() throws Exception {
320    String[] args = new String[] { TABLE_NAME, "--range=\\x00rov,\\x00rox", COL_FAM + ":" + COL1 };
321    runCreateSubmittableJobWithArgs(args, 8);
322  }
323
324  /**
325   * Test a case when a range is specified with single range of start-end keys
326   * @throws Exception in case of any unexpected error.
327   */
328  @Test
329  public void testCreateSubmittableJobWithArgsRowSingleRange() throws Exception {
330    String[] args = new String[] { TABLE_NAME, "--range=\\x00row1,\\x00row3" };
331    runCreateSubmittableJobWithArgs(args, 2);
332  }
333
334  /**
335   * Test a case when a range is specified with single range with end key only
336   * @throws Exception in case of any unexpected error.
337   */
338  @Test
339  public void testCreateSubmittableJobWithArgsRowSingleRangeUpperBound() throws Exception {
340    String[] args = new String[] { TABLE_NAME, "--range=,\\x00row3" };
341    runCreateSubmittableJobWithArgs(args, 3);
342  }
343
344  /**
345   * Test a case when a range is specified with two ranges where one range is with end key only
346   * @throws Exception in case of any unexpected error.
347   */
348  @Test
349  public void testCreateSubmittableJobWithArgsRowMultiRangeUpperBound() throws Exception {
350    String[] args = new String[] { TABLE_NAME, "--range=,\\x00row3;\\x00row5,\\x00row7" };
351    runCreateSubmittableJobWithArgs(args, 5);
352  }
353
354  /**
355   * Test a case when a range is specified with multiple ranges of start-end keys
356   * @throws Exception in case of any unexpected error.
357   */
358  @Test
359  public void testCreateSubmittableJobWithArgsRowMultiRange() throws Exception {
360    String[] args = new String[] { TABLE_NAME, "--range=\\x00row1,\\x00row3;\\x00row5,\\x00row8" };
361    runCreateSubmittableJobWithArgs(args, 5);
362  }
363
364  /**
365   * Test a case when a range is specified with multiple ranges of start-end keys; one range is
366   * filled, another two are not
367   * @throws Exception in case of any unexpected error.
368   */
369  @Test
370  public void testCreateSubmittableJobWithArgsRowMultiEmptyRange() throws Exception {
371    String[] args = new String[] { TABLE_NAME, "--range=\\x00row1,\\x00row3;;" };
372    runCreateSubmittableJobWithArgs(args, 2);
373  }
374
375  @Test
376  public void testCreateSubmittableJobWithArgs10kRowRange() throws Exception {
377    String tableName = TABLE_NAME + "CreateSubmittableJobWithArgs10kRowRange";
378
379    try (
380      Table table = TEST_UTIL.createTable(TableName.valueOf(tableName), Bytes.toBytes(COL_FAM))) {
381      writeRows(table, 10000, 0);
382    }
383    String[] args = new String[] { tableName, "--range=\\x00row9872,\\x00row9875" };
384    runCreateSubmittableJobWithArgs(args, 3);
385  }
386
387  /**
388   * Test a case when the timerange is specified with --starttime and --endtime options
389   * @throws Exception in case of any unexpected error.
390   */
391  @Test
392  public void testCreateSubmittableJobWithArgsTimeRange() throws Exception {
393    final byte[] family = Bytes.toBytes(COL_FAM);
394    final byte[] col1 = Bytes.toBytes(COL1);
395    Put put1 = new Put(Bytes.toBytes("row_timerange_" + 1));
396    Put put2 = new Put(Bytes.toBytes("row_timerange_" + 2));
397    Put put3 = new Put(Bytes.toBytes("row_timerange_" + 3));
398
399    long ts;
400
401    String tableName = TABLE_NAME_TS_RANGE + "CreateSubmittableJobWithArgs";
402    // clean up content of TABLE_NAME
403    Table table = TEST_UTIL.createTable(TableName.valueOf(tableName), Bytes.toBytes(COL_FAM));
404
405    ts = EnvironmentEdgeManager.currentTime();
406    put1.addColumn(family, col1, ts, Bytes.toBytes("val1"));
407    table.put(put1);
408    Thread.sleep(100);
409
410    ts = EnvironmentEdgeManager.currentTime();
411    put2.addColumn(family, col1, ts, Bytes.toBytes("val2"));
412    put3.addColumn(family, col1, ts, Bytes.toBytes("val3"));
413    table.put(put2);
414    table.put(put3);
415    table.close();
416
417    String[] args =
418      new String[] { tableName, COL_FAM + ":" + COL1, "--starttime=" + 0, "--endtime=" + ts };
419    runCreateSubmittableJobWithArgs(args, 1);
420
421    args = new String[] { tableName, COL_FAM + ":" + COL1, "--starttime=" + 0,
422      "--endtime=" + (ts - 10) };
423    runCreateSubmittableJobWithArgs(args, 1);
424
425    args = new String[] { tableName, COL_FAM + ":" + COL1, "--starttime=" + ts,
426      "--endtime=" + (ts + 1000) };
427    runCreateSubmittableJobWithArgs(args, 2);
428
429    args = new String[] { tableName, COL_FAM + ":" + COL1, "--starttime=" + (ts - 30 * 1000),
430      "--endtime=" + (ts + 30 * 1000), };
431    runCreateSubmittableJobWithArgs(args, 3);
432  }
433
434  /**
435   * Writes TOTAL_ROWS number of distinct rows in to the table. Few rows have two columns, Few have
436   * one. nn
437   */
438  private static void writeRows(Table table, int totalRows, int rowsWithOneCol) throws IOException {
439    final byte[] family = Bytes.toBytes(COL_FAM);
440    final byte[] value = Bytes.toBytes("abcd");
441    final byte[] col1 = Bytes.toBytes(COL1);
442    final byte[] col2 = Bytes.toBytes(COL2);
443    final byte[] col3 = Bytes.toBytes(COMPOSITE_COLUMN);
444    ArrayList<Put> rowsUpdate = new ArrayList<>();
445    // write few rows with two columns
446    int i = 0;
447    for (; i < totalRows - rowsWithOneCol; i++) {
448      // Use binary rows values to test for HBASE-15287.
449      byte[] row = Bytes.toBytesBinary("\\x00row" + i);
450      Put put = new Put(row);
451      put.addColumn(family, col1, value);
452      put.addColumn(family, col2, value);
453      put.addColumn(family, col3, value);
454      rowsUpdate.add(put);
455    }
456
457    // write few rows with only one column
458    for (; i < totalRows; i++) {
459      byte[] row = Bytes.toBytes("row" + i);
460      Put put = new Put(row);
461      put.addColumn(family, col2, value);
462      rowsUpdate.add(put);
463    }
464    table.put(rowsUpdate);
465  }
466
467  /**
468   * test main method. Import should print help and call System.exit
469   */
470  @Test
471  public void testImportMain() throws Exception {
472    SecurityManager SECURITY_MANAGER = System.getSecurityManager();
473    LauncherSecurityManager newSecurityManager = new LauncherSecurityManager();
474    System.setSecurityManager(newSecurityManager);
475    String[] args = {};
476    try {
477      try {
478        RowCounter.main(args);
479        fail("should be SecurityException");
480      } catch (SecurityException e) {
481        assertEquals(RowCounter.EXIT_FAILURE, newSecurityManager.getExitCode());
482      }
483      try {
484        args = new String[2];
485        args[0] = "table";
486        args[1] = "--range=1";
487        RowCounter.main(args);
488        fail("should be SecurityException");
489      } catch (SecurityException e) {
490        assertEquals(RowCounter.EXIT_FAILURE, newSecurityManager.getExitCode());
491      }
492
493    } finally {
494      System.setSecurityManager(SECURITY_MANAGER);
495    }
496  }
497
498  @Test
499  public void testHelp() throws Exception {
500    PrintStream oldPrintStream = System.out;
501    try {
502      ByteArrayOutputStream data = new ByteArrayOutputStream();
503      PrintStream stream = new PrintStream(data);
504      System.setOut(stream);
505      String[] args = { "-h" };
506      runRowCount(args, 0);
507      assertUsageContent(data.toString());
508      args = new String[] { "--help" };
509      runRowCount(args, 0);
510      assertUsageContent(data.toString());
511    } finally {
512      System.setOut(oldPrintStream);
513    }
514  }
515
516  @Test
517  public void testInvalidTable() throws Exception {
518    try {
519      String[] args = { "invalid" };
520      runRowCount(args, 0);
521      fail("RowCounter should had failed with invalid table.");
522    } catch (Throwable e) {
523      assertTrue(e instanceof AssertionError);
524    }
525  }
526
527  private void assertUsageContent(String usage) {
528    assertTrue(usage
529      .contains("usage: hbase rowcounter " + "<tablename> [options] [<column1> <column2>...]"));
530    assertTrue(usage.contains("Options:\n"));
531    assertTrue(usage.contains(
532      "--starttime=<arg>       " + "starting time filter to start counting rows from.\n"));
533    assertTrue(usage.contains("--endtime=<arg>         "
534      + "end time filter limit, to only count rows up to this timestamp.\n"));
535    assertTrue(usage
536      .contains("--range=<arg>           " + "[startKey],[endKey][;[startKey],[endKey]...]]\n"));
537    assertTrue(usage.contains("--expectedCount=<arg>   expected number of rows to be count.\n"));
538    assertTrue(
539      usage.contains("For performance, " + "consider the following configuration properties:\n"));
540    assertTrue(usage.contains("-Dhbase.client.scanner.caching=100\n"));
541    assertTrue(usage.contains("-Dmapreduce.map.speculative=false\n"));
542  }
543
544}