001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapred;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertNotNull;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023import static org.mockito.ArgumentMatchers.any;
024import static org.mockito.ArgumentMatchers.anyLong;
025import static org.mockito.Mockito.mock;
026import static org.mockito.Mockito.times;
027
028import java.io.ByteArrayOutputStream;
029import java.io.IOException;
030import java.io.PrintStream;
031import org.apache.hadoop.hbase.HBaseConfiguration;
032import org.apache.hadoop.hbase.client.Result;
033import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
034import org.apache.hadoop.hbase.mapred.RowCounter.RowCounterMapper;
035import org.apache.hadoop.hbase.testclassification.MapReduceTests;
036import org.apache.hadoop.hbase.testclassification.MediumTests;
037import org.apache.hadoop.mapred.JobConf;
038import org.apache.hadoop.mapred.OutputCollector;
039import org.apache.hadoop.mapred.Reporter;
040import org.junit.jupiter.api.Tag;
041import org.junit.jupiter.api.Test;
042import org.mockito.Mockito;
043
044import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
045
046@Tag(MapReduceTests.TAG)
047@Tag(MediumTests.TAG)
048public class TestRowCounter {
049
050  @Test
051  @SuppressWarnings("deprecation")
052  public void shouldPrintUsage() throws Exception {
053    String expectedOutput = "rowcounter <outputdir> <tablename> <column1> [<column2>...]";
054    String result = new OutputReader(System.out) {
055      @Override
056      void doRead() {
057        assertEquals(-1, RowCounter.printUsage());
058      }
059    }.read();
060
061    assertTrue(result.startsWith(expectedOutput));
062  }
063
064  @Test
065  @SuppressWarnings("deprecation")
066  public void shouldExitAndPrintUsageSinceParameterNumberLessThanThree() throws Exception {
067    final String[] args = new String[] { "one", "two" };
068    String line = "ERROR: Wrong number of parameters: " + args.length;
069    String result = new OutputReader(System.err) {
070      @Override
071      void doRead() throws Exception {
072        assertEquals(-1, new RowCounter().run(args));
073      }
074    }.read();
075
076    assertTrue(result.startsWith(line));
077  }
078
079  @Test
080  @SuppressWarnings({ "deprecation", "unchecked" })
081  public void shouldRegInReportEveryIncomingRow() throws IOException {
082    int iterationNumber = 999;
083    RowCounter.RowCounterMapper mapper = new RowCounter.RowCounterMapper();
084    Reporter reporter = mock(Reporter.class);
085    for (int i = 0; i < iterationNumber; i++)
086      mapper.map(mock(ImmutableBytesWritable.class), mock(Result.class),
087        mock(OutputCollector.class), reporter);
088
089    Mockito.verify(reporter, times(iterationNumber)).incrCounter(any(), anyLong());
090  }
091
092  @Test
093  @SuppressWarnings({ "deprecation" })
094  public void shouldCreateAndRunSubmittableJob() throws Exception {
095    RowCounter rCounter = new RowCounter();
096    rCounter.setConf(HBaseConfiguration.create());
097    String[] args = new String[] { "\temp", "tableA", "column1", "column2", "column3" };
098    JobConf jobConfig = rCounter.createSubmittableJob(args);
099
100    assertNotNull(jobConfig);
101    assertEquals(0, jobConfig.getNumReduceTasks());
102    assertEquals("rowcounter", jobConfig.getJobName());
103    assertEquals(jobConfig.getMapOutputValueClass(), Result.class);
104    assertEquals(jobConfig.getMapperClass(), RowCounterMapper.class);
105    assertEquals(jobConfig.get(TableInputFormat.COLUMN_LIST),
106      Joiner.on(' ').join("column1", "column2", "column3"));
107    assertEquals(jobConfig.getMapOutputKeyClass(), ImmutableBytesWritable.class);
108  }
109
110  enum Outs {
111    OUT,
112    ERR
113  }
114
115  private static abstract class OutputReader {
116    private final PrintStream ps;
117    private PrintStream oldPrintStream;
118    private Outs outs;
119
120    protected OutputReader(PrintStream ps) {
121      this.ps = ps;
122    }
123
124    protected String read() throws Exception {
125      ByteArrayOutputStream outBytes = new ByteArrayOutputStream();
126      if (ps == System.out) {
127        oldPrintStream = System.out;
128        outs = Outs.OUT;
129        System.setOut(new PrintStream(outBytes));
130      } else if (ps == System.err) {
131        oldPrintStream = System.err;
132        outs = Outs.ERR;
133        System.setErr(new PrintStream(outBytes));
134      } else {
135        throw new IllegalStateException("OutputReader: unsupported PrintStream");
136      }
137
138      try {
139        doRead();
140        return new String(outBytes.toByteArray());
141      } finally {
142        switch (outs) {
143          case OUT: {
144            System.setOut(oldPrintStream);
145            break;
146          }
147          case ERR: {
148            System.setErr(oldPrintStream);
149            break;
150          }
151          default:
152            throw new IllegalStateException("OutputReader: unsupported PrintStream");
153        }
154      }
155    }
156
157    abstract void doRead() throws Exception;
158  }
159}