001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapred;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertTrue;
023import static org.mockito.ArgumentMatchers.anyLong;
024import static org.mockito.Matchers.any;
025import static org.mockito.Mockito.mock;
026import static org.mockito.Mockito.times;
027
028import java.io.ByteArrayOutputStream;
029import java.io.IOException;
030import java.io.PrintStream;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseConfiguration;
033import org.apache.hadoop.hbase.client.Result;
034import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
035import org.apache.hadoop.hbase.mapred.RowCounter.RowCounterMapper;
036import org.apache.hadoop.hbase.testclassification.MapReduceTests;
037import org.apache.hadoop.hbase.testclassification.SmallTests;
038import org.apache.hadoop.mapred.JobConf;
039import org.apache.hadoop.mapred.OutputCollector;
040import org.apache.hadoop.mapred.Reporter;
041import org.junit.ClassRule;
042import org.junit.Test;
043import org.junit.experimental.categories.Category;
044import org.mockito.Mockito;
045
046import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
047
048@Category({MapReduceTests.class, SmallTests.class})
049public class TestRowCounter {
050
051  @ClassRule
052  public static final HBaseClassTestRule CLASS_RULE =
053      HBaseClassTestRule.forClass(TestRowCounter.class);
054
055  @Test
056  @SuppressWarnings("deprecation")
057  public void shouldPrintUsage() throws Exception {
058    String expectedOutput = "rowcounter <outputdir> <tablename> <column1> [<column2>...]";
059    String result = new OutputReader(System.out) {
060      @Override
061      void doRead() {
062        assertEquals(-1, RowCounter.printUsage());
063      }
064    }.read();
065
066    assertTrue(result.startsWith(expectedOutput));
067  }
068
069  @Test
070  @SuppressWarnings("deprecation")
071  public void shouldExitAndPrintUsageSinceParameterNumberLessThanThree()
072      throws Exception {
073    final String[] args = new String[] { "one", "two" };
074    String line = "ERROR: Wrong number of parameters: " + args.length;
075    String result = new OutputReader(System.err) {
076      @Override
077      void doRead() throws Exception {
078        assertEquals(-1, new RowCounter().run(args));
079      }
080    }.read();
081
082    assertTrue(result.startsWith(line));
083  }
084
085  @Test
086  @SuppressWarnings({ "deprecation", "unchecked" })
087  public void shouldRegInReportEveryIncomingRow() throws IOException {
088    int iterationNumber = 999;
089    RowCounter.RowCounterMapper mapper = new RowCounter.RowCounterMapper();
090    Reporter reporter = mock(Reporter.class);
091    for (int i = 0; i < iterationNumber; i++)
092      mapper.map(mock(ImmutableBytesWritable.class), mock(Result.class),
093          mock(OutputCollector.class), reporter);
094
095    Mockito.verify(reporter, times(iterationNumber)).incrCounter(
096        any(), anyLong());
097  }
098
099  @Test
100  @SuppressWarnings({ "deprecation" })
101  public void shouldCreateAndRunSubmittableJob() throws Exception {
102    RowCounter rCounter = new RowCounter();
103    rCounter.setConf(HBaseConfiguration.create());
104    String[] args = new String[] { "\temp", "tableA", "column1", "column2",
105        "column3" };
106    JobConf jobConfig = rCounter.createSubmittableJob(args);
107
108    assertNotNull(jobConfig);
109    assertEquals(0, jobConfig.getNumReduceTasks());
110    assertEquals("rowcounter", jobConfig.getJobName());
111    assertEquals(jobConfig.getMapOutputValueClass(), Result.class);
112    assertEquals(jobConfig.getMapperClass(), RowCounterMapper.class);
113    assertEquals(jobConfig.get(TableInputFormat.COLUMN_LIST), Joiner.on(' ')
114        .join("column1", "column2", "column3"));
115    assertEquals(jobConfig.getMapOutputKeyClass(), ImmutableBytesWritable.class);
116  }
117
118  enum Outs {
119    OUT, ERR
120  }
121
122  private static abstract class OutputReader {
123    private final PrintStream ps;
124    private PrintStream oldPrintStream;
125    private Outs outs;
126
127    protected OutputReader(PrintStream ps) {
128      this.ps = ps;
129    }
130
131    protected String read() throws Exception {
132      ByteArrayOutputStream outBytes = new ByteArrayOutputStream();
133      if (ps == System.out) {
134        oldPrintStream = System.out;
135        outs = Outs.OUT;
136        System.setOut(new PrintStream(outBytes));
137      } else if (ps == System.err) {
138        oldPrintStream = System.err;
139        outs = Outs.ERR;
140        System.setErr(new PrintStream(outBytes));
141      } else {
142        throw new IllegalStateException("OutputReader: unsupported PrintStream");
143      }
144
145      try {
146        doRead();
147        return new String(outBytes.toByteArray());
148      } finally {
149        switch (outs) {
150        case OUT: {
151          System.setOut(oldPrintStream);
152          break;
153        }
154        case ERR: {
155          System.setErr(oldPrintStream);
156          break;
157        }
158        default:
159          throw new IllegalStateException(
160              "OutputReader: unsupported PrintStream");
161        }
162      }
163    }
164
165    abstract void doRead() throws Exception;
166  }
167}