001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapred; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertNotNull; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023import static org.mockito.ArgumentMatchers.any; 024import static org.mockito.ArgumentMatchers.anyLong; 025import static org.mockito.Mockito.mock; 026import static org.mockito.Mockito.times; 027 028import java.io.ByteArrayOutputStream; 029import java.io.IOException; 030import java.io.PrintStream; 031import org.apache.hadoop.hbase.HBaseConfiguration; 032import org.apache.hadoop.hbase.client.Result; 033import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 034import org.apache.hadoop.hbase.mapred.RowCounter.RowCounterMapper; 035import org.apache.hadoop.hbase.testclassification.MapReduceTests; 036import org.apache.hadoop.hbase.testclassification.MediumTests; 037import org.apache.hadoop.mapred.JobConf; 038import org.apache.hadoop.mapred.OutputCollector; 039import org.apache.hadoop.mapred.Reporter; 040import org.junit.jupiter.api.Tag; 041import org.junit.jupiter.api.Test; 042import org.mockito.Mockito; 043 044import org.apache.hbase.thirdparty.com.google.common.base.Joiner; 045 046@Tag(MapReduceTests.TAG) 047@Tag(MediumTests.TAG) 048public class TestRowCounter { 049 050 @Test 051 @SuppressWarnings("deprecation") 052 public void shouldPrintUsage() throws Exception { 053 String expectedOutput = "rowcounter <outputdir> <tablename> <column1> [<column2>...]"; 054 String result = new OutputReader(System.out) { 055 @Override 056 void doRead() { 057 assertEquals(-1, RowCounter.printUsage()); 058 } 059 }.read(); 060 061 assertTrue(result.startsWith(expectedOutput)); 062 } 063 064 @Test 065 @SuppressWarnings("deprecation") 066 public void shouldExitAndPrintUsageSinceParameterNumberLessThanThree() throws Exception { 067 final String[] args = new String[] { "one", "two" }; 068 String line = "ERROR: Wrong number of parameters: " + args.length; 069 String result = new OutputReader(System.err) { 070 @Override 071 void doRead() throws Exception { 072 assertEquals(-1, new RowCounter().run(args)); 073 } 074 }.read(); 075 076 assertTrue(result.startsWith(line)); 077 } 078 079 @Test 080 @SuppressWarnings({ "deprecation", "unchecked" }) 081 public void shouldRegInReportEveryIncomingRow() throws IOException { 082 int iterationNumber = 999; 083 RowCounter.RowCounterMapper mapper = new RowCounter.RowCounterMapper(); 084 Reporter reporter = mock(Reporter.class); 085 for (int i = 0; i < iterationNumber; i++) 086 mapper.map(mock(ImmutableBytesWritable.class), mock(Result.class), 087 mock(OutputCollector.class), reporter); 088 089 Mockito.verify(reporter, times(iterationNumber)).incrCounter(any(), anyLong()); 090 } 091 092 @Test 093 @SuppressWarnings({ "deprecation" }) 094 public void shouldCreateAndRunSubmittableJob() throws Exception { 095 RowCounter rCounter = new RowCounter(); 096 rCounter.setConf(HBaseConfiguration.create()); 097 String[] args = new String[] { "\temp", "tableA", "column1", "column2", "column3" }; 098 JobConf jobConfig = rCounter.createSubmittableJob(args); 099 100 assertNotNull(jobConfig); 101 assertEquals(0, jobConfig.getNumReduceTasks()); 102 assertEquals("rowcounter", jobConfig.getJobName()); 103 assertEquals(jobConfig.getMapOutputValueClass(), Result.class); 104 assertEquals(jobConfig.getMapperClass(), RowCounterMapper.class); 105 assertEquals(jobConfig.get(TableInputFormat.COLUMN_LIST), 106 Joiner.on(' ').join("column1", "column2", "column3")); 107 assertEquals(jobConfig.getMapOutputKeyClass(), ImmutableBytesWritable.class); 108 } 109 110 enum Outs { 111 OUT, 112 ERR 113 } 114 115 private static abstract class OutputReader { 116 private final PrintStream ps; 117 private PrintStream oldPrintStream; 118 private Outs outs; 119 120 protected OutputReader(PrintStream ps) { 121 this.ps = ps; 122 } 123 124 protected String read() throws Exception { 125 ByteArrayOutputStream outBytes = new ByteArrayOutputStream(); 126 if (ps == System.out) { 127 oldPrintStream = System.out; 128 outs = Outs.OUT; 129 System.setOut(new PrintStream(outBytes)); 130 } else if (ps == System.err) { 131 oldPrintStream = System.err; 132 outs = Outs.ERR; 133 System.setErr(new PrintStream(outBytes)); 134 } else { 135 throw new IllegalStateException("OutputReader: unsupported PrintStream"); 136 } 137 138 try { 139 doRead(); 140 return new String(outBytes.toByteArray()); 141 } finally { 142 switch (outs) { 143 case OUT: { 144 System.setOut(oldPrintStream); 145 break; 146 } 147 case ERR: { 148 System.setErr(oldPrintStream); 149 break; 150 } 151 default: 152 throw new IllegalStateException("OutputReader: unsupported PrintStream"); 153 } 154 } 155 } 156 157 abstract void doRead() throws Exception; 158 } 159}