001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.File; 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.Locale; 028import java.util.Map; 029import java.util.NavigableMap; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileUtil; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.HBaseTestingUtility; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.Result; 036import org.apache.hadoop.hbase.client.Scan; 037import org.apache.hadoop.hbase.client.Table; 038import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.io.NullWritable; 041import org.apache.hadoop.mapreduce.Job; 042import org.apache.hadoop.mapreduce.Reducer; 043import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 044import org.junit.After; 045import org.junit.AfterClass; 046import org.junit.BeforeClass; 047import org.junit.Test; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 052 053/** 054 * Base set of tests and setup for input formats touching multiple tables. 055 */ 056public abstract class MultiTableInputFormatTestBase { 057 static final Logger LOG = LoggerFactory.getLogger(TestMultiTableInputFormat.class); 058 public static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 059 static final String TABLE_NAME = "scantest"; 060 static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); 061 static final String KEY_STARTROW = "startRow"; 062 static final String KEY_LASTROW = "stpRow"; 063 064 static List<String> TABLES = Lists.newArrayList(); 065 066 static { 067 for (int i = 0; i < 3; i++) { 068 TABLES.add(TABLE_NAME + String.valueOf(i)); 069 } 070 } 071 072 @BeforeClass 073 public static void setUpBeforeClass() throws Exception { 074 // switch TIF to log at DEBUG level 075 TEST_UTIL.enableDebug(MultiTableInputFormatBase.class); 076 // start mini hbase cluster 077 TEST_UTIL.startMiniCluster(3); 078 // create and fill table 079 for (String tableName : TABLES) { 080 try (Table table = 081 TEST_UTIL.createMultiRegionTable(TableName.valueOf(tableName), INPUT_FAMILY, 4)) { 082 TEST_UTIL.loadTable(table, INPUT_FAMILY, false); 083 } 084 } 085 } 086 087 @AfterClass 088 public static void tearDownAfterClass() throws Exception { 089 TEST_UTIL.shutdownMiniCluster(); 090 } 091 092 @After 093 public void tearDown() throws Exception { 094 Configuration c = TEST_UTIL.getConfiguration(); 095 FileUtil.fullyDelete(new File(c.get("hadoop.tmp.dir"))); 096 } 097 098 /** 099 * Pass the key and value to reducer. 100 */ 101 public static class ScanMapper 102 extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> { 103 /** 104 * Pass the key and value to reduce. 105 * @param key The key, here "aaa", "aab" etc. 106 * @param value The value is the same as the key. 107 * @param context The task context. 108 * @throws IOException When reading the rows fails. 109 */ 110 @Override 111 public void map(ImmutableBytesWritable key, Result value, Context context) 112 throws IOException, InterruptedException { 113 makeAssertions(key, value); 114 context.write(key, key); 115 } 116 117 public void makeAssertions(ImmutableBytesWritable key, Result value) throws IOException { 118 if (value.size() != 1) { 119 throw new IOException("There should only be one input column"); 120 } 121 Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> cf = value.getMap(); 122 if (!cf.containsKey(INPUT_FAMILY)) { 123 throw new IOException( 124 "Wrong input columns. Missing: '" + Bytes.toString(INPUT_FAMILY) + "'."); 125 } 126 String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null)); 127 LOG.debug("map: key -> " + Bytes.toStringBinary(key.get()) + ", value -> " + val); 128 } 129 } 130 131 /** 132 * Checks the last and first keys seen against the scanner boundaries. 133 */ 134 public static class ScanReducer 135 extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, NullWritable, NullWritable> { 136 private String first = null; 137 private String last = null; 138 139 @Override 140 protected void reduce(ImmutableBytesWritable key, Iterable<ImmutableBytesWritable> values, 141 Context context) throws IOException, InterruptedException { 142 makeAssertions(key, values); 143 } 144 145 protected void makeAssertions(ImmutableBytesWritable key, 146 Iterable<ImmutableBytesWritable> values) { 147 int count = 0; 148 for (ImmutableBytesWritable value : values) { 149 String val = Bytes.toStringBinary(value.get()); 150 LOG.debug( 151 "reduce: key[" + count + "] -> " + Bytes.toStringBinary(key.get()) + ", value -> " + val); 152 if (first == null) first = val; 153 last = val; 154 count++; 155 } 156 assertEquals(3, count); 157 } 158 159 @Override 160 protected void cleanup(Context context) throws IOException, InterruptedException { 161 Configuration c = context.getConfiguration(); 162 cleanup(c); 163 } 164 165 protected void cleanup(Configuration c) { 166 String startRow = c.get(KEY_STARTROW); 167 String lastRow = c.get(KEY_LASTROW); 168 LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + startRow + "\""); 169 LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + "\""); 170 if (startRow != null && startRow.length() > 0) { 171 assertEquals(startRow, first); 172 } 173 if (lastRow != null && lastRow.length() > 0) { 174 assertEquals(lastRow, last); 175 } 176 } 177 } 178 179 @Test 180 public void testScanEmptyToEmpty() 181 throws IOException, InterruptedException, ClassNotFoundException { 182 testScan(null, null, null); 183 } 184 185 @Test 186 public void testScanEmptyToAPP() 187 throws IOException, InterruptedException, ClassNotFoundException { 188 testScan(null, "app", "apo"); 189 } 190 191 @Test 192 public void testScanOBBToOPP() throws IOException, InterruptedException, ClassNotFoundException { 193 testScan("obb", "opp", "opo"); 194 } 195 196 @Test 197 public void testScanYZYToEmpty() 198 throws IOException, InterruptedException, ClassNotFoundException { 199 testScan("yzy", null, "zzz"); 200 } 201 202 /** 203 * Tests a MR scan using specific start and stop rows. nnn 204 */ 205 private void testScan(String start, String stop, String last) 206 throws IOException, InterruptedException, ClassNotFoundException { 207 String jobName = "Scan" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") + "To" 208 + (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty"); 209 LOG.info("Before map/reduce startup - job " + jobName); 210 Configuration c = new Configuration(TEST_UTIL.getConfiguration()); 211 212 c.set(KEY_STARTROW, start != null ? start : ""); 213 c.set(KEY_LASTROW, last != null ? last : ""); 214 215 List<Scan> scans = new ArrayList<>(); 216 217 for (String tableName : TABLES) { 218 Scan scan = new Scan(); 219 220 scan.addFamily(INPUT_FAMILY); 221 scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(tableName)); 222 223 if (start != null) { 224 scan.setStartRow(Bytes.toBytes(start)); 225 } 226 if (stop != null) { 227 scan.setStopRow(Bytes.toBytes(stop)); 228 } 229 230 scans.add(scan); 231 232 LOG.info("scan before: " + scan); 233 } 234 235 runJob(jobName, c, scans); 236 } 237 238 protected void runJob(String jobName, Configuration c, List<Scan> scans) 239 throws IOException, InterruptedException, ClassNotFoundException { 240 Job job = new Job(c, jobName); 241 242 initJob(scans, job); 243 job.setReducerClass(ScanReducer.class); 244 job.setNumReduceTasks(1); // one to get final "first" and "last" key 245 FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); 246 LOG.info("Started " + job.getJobName()); 247 job.waitForCompletion(true); 248 assertTrue(job.isSuccessful()); 249 LOG.info("After map/reduce completion - job " + jobName); 250 } 251 252 protected abstract void initJob(List<Scan> scans, Job job) throws IOException; 253 254}