001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.mapreduce; 020 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertTrue; 023 024import java.io.File; 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.List; 028import java.util.Locale; 029import java.util.Map; 030import java.util.NavigableMap; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileUtil; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.HBaseTestingUtility; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.Result; 037import org.apache.hadoop.hbase.client.Scan; 038import org.apache.hadoop.hbase.client.Table; 039import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.io.NullWritable; 042import org.apache.hadoop.mapreduce.Job; 043import org.apache.hadoop.mapreduce.Reducer; 044import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 045import org.junit.After; 046import org.junit.AfterClass; 047import org.junit.BeforeClass; 048import org.junit.Test; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 053 054/** 055 * Base set of tests and setup for input formats touching multiple tables. 056 */ 057public abstract class MultiTableInputFormatTestBase { 058 static final Logger LOG = LoggerFactory.getLogger(TestMultiTableInputFormat.class); 059 public static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 060 static final String TABLE_NAME = "scantest"; 061 static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); 062 static final String KEY_STARTROW = "startRow"; 063 static final String KEY_LASTROW = "stpRow"; 064 065 static List<String> TABLES = Lists.newArrayList(); 066 067 static { 068 for (int i = 0; i < 3; i++) { 069 TABLES.add(TABLE_NAME + String.valueOf(i)); 070 } 071 } 072 073 @BeforeClass 074 public static void setUpBeforeClass() throws Exception { 075 // switch TIF to log at DEBUG level 076 TEST_UTIL.enableDebug(MultiTableInputFormatBase.class); 077 // start mini hbase cluster 078 TEST_UTIL.startMiniCluster(3); 079 // create and fill table 080 for (String tableName : TABLES) { 081 try (Table table = 082 TEST_UTIL.createMultiRegionTable(TableName.valueOf(tableName), 083 INPUT_FAMILY, 4)) { 084 TEST_UTIL.loadTable(table, INPUT_FAMILY, false); 085 } 086 } 087 } 088 089 @AfterClass 090 public static void tearDownAfterClass() throws Exception { 091 TEST_UTIL.shutdownMiniCluster(); 092 } 093 094 @After 095 public void tearDown() throws Exception { 096 Configuration c = TEST_UTIL.getConfiguration(); 097 FileUtil.fullyDelete(new File(c.get("hadoop.tmp.dir"))); 098 } 099 100 /** 101 * Pass the key and value to reducer. 102 */ 103 public static class ScanMapper extends 104 TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> { 105 /** 106 * Pass the key and value to reduce. 107 * 108 * @param key The key, here "aaa", "aab" etc. 109 * @param value The value is the same as the key. 110 * @param context The task context. 111 * @throws IOException When reading the rows fails. 112 */ 113 @Override 114 public void map(ImmutableBytesWritable key, Result value, Context context) 115 throws IOException, InterruptedException { 116 makeAssertions(key, value); 117 context.write(key, key); 118 } 119 120 public void makeAssertions(ImmutableBytesWritable key, Result value) throws IOException { 121 if (value.size() != 1) { 122 throw new IOException("There should only be one input column"); 123 } 124 Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> cf = 125 value.getMap(); 126 if (!cf.containsKey(INPUT_FAMILY)) { 127 throw new IOException("Wrong input columns. Missing: '" + 128 Bytes.toString(INPUT_FAMILY) + "'."); 129 } 130 String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null)); 131 LOG.debug("map: key -> " + Bytes.toStringBinary(key.get()) + 132 ", value -> " + val); 133 } 134 } 135 136 /** 137 * Checks the last and first keys seen against the scanner boundaries. 138 */ 139 public static class ScanReducer 140 extends 141 Reducer<ImmutableBytesWritable, ImmutableBytesWritable, 142 NullWritable, NullWritable> { 143 private String first = null; 144 private String last = null; 145 146 @Override 147 protected void reduce(ImmutableBytesWritable key, 148 Iterable<ImmutableBytesWritable> values, Context context) 149 throws IOException, InterruptedException { 150 makeAssertions(key, values); 151 } 152 153 protected void makeAssertions(ImmutableBytesWritable key, 154 Iterable<ImmutableBytesWritable> values) { 155 int count = 0; 156 for (ImmutableBytesWritable value : values) { 157 String val = Bytes.toStringBinary(value.get()); 158 LOG.debug("reduce: key[" + count + "] -> " + 159 Bytes.toStringBinary(key.get()) + ", value -> " + val); 160 if (first == null) first = val; 161 last = val; 162 count++; 163 } 164 assertEquals(3, count); 165 } 166 167 @Override 168 protected void cleanup(Context context) throws IOException, 169 InterruptedException { 170 Configuration c = context.getConfiguration(); 171 cleanup(c); 172 } 173 174 protected void cleanup(Configuration c) { 175 String startRow = c.get(KEY_STARTROW); 176 String lastRow = c.get(KEY_LASTROW); 177 LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + 178 startRow + "\""); 179 LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + 180 "\""); 181 if (startRow != null && startRow.length() > 0) { 182 assertEquals(startRow, first); 183 } 184 if (lastRow != null && lastRow.length() > 0) { 185 assertEquals(lastRow, last); 186 } 187 } 188 } 189 190 @Test 191 public void testScanEmptyToEmpty() throws IOException, InterruptedException, 192 ClassNotFoundException { 193 testScan(null, null, null); 194 } 195 196 @Test 197 public void testScanEmptyToAPP() throws IOException, InterruptedException, 198 ClassNotFoundException { 199 testScan(null, "app", "apo"); 200 } 201 202 @Test 203 public void testScanOBBToOPP() throws IOException, InterruptedException, 204 ClassNotFoundException { 205 testScan("obb", "opp", "opo"); 206 } 207 208 @Test 209 public void testScanYZYToEmpty() throws IOException, InterruptedException, 210 ClassNotFoundException { 211 testScan("yzy", null, "zzz"); 212 } 213 214 /** 215 * Tests a MR scan using specific start and stop rows. 216 * 217 * @throws IOException 218 * @throws ClassNotFoundException 219 * @throws InterruptedException 220 */ 221 private void testScan(String start, String stop, String last) 222 throws IOException, InterruptedException, ClassNotFoundException { 223 String jobName = 224 "Scan" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") + "To" + 225 (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty"); 226 LOG.info("Before map/reduce startup - job " + jobName); 227 Configuration c = new Configuration(TEST_UTIL.getConfiguration()); 228 229 c.set(KEY_STARTROW, start != null ? start : ""); 230 c.set(KEY_LASTROW, last != null ? last : ""); 231 232 List<Scan> scans = new ArrayList<>(); 233 234 for (String tableName : TABLES) { 235 Scan scan = new Scan(); 236 237 scan.addFamily(INPUT_FAMILY); 238 scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(tableName)); 239 240 if (start != null) { 241 scan.setStartRow(Bytes.toBytes(start)); 242 } 243 if (stop != null) { 244 scan.setStopRow(Bytes.toBytes(stop)); 245 } 246 247 scans.add(scan); 248 249 LOG.info("scan before: " + scan); 250 } 251 252 runJob(jobName, c, scans); 253 } 254 255 protected void runJob(String jobName, Configuration c, List<Scan> scans) 256 throws IOException, InterruptedException, ClassNotFoundException { 257 Job job = new Job(c, jobName); 258 259 initJob(scans, job); 260 job.setReducerClass(ScanReducer.class); 261 job.setNumReduceTasks(1); // one to get final "first" and "last" key 262 FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); 263 LOG.info("Started " + job.getJobName()); 264 job.waitForCompletion(true); 265 assertTrue(job.isSuccessful()); 266 LOG.info("After map/reduce completion - job " + jobName); 267 } 268 269 protected abstract void initJob(List<Scan> scans, Job job) throws IOException; 270 271 272}