001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.File; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.Map; 025import java.util.NavigableMap; 026import java.util.TreeMap; 027import org.apache.hadoop.conf.Configurable; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileUtil; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.HBaseTestingUtility; 034import org.apache.hadoop.hbase.HColumnDescriptor; 035import org.apache.hadoop.hbase.HTableDescriptor; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.client.Admin; 038import org.apache.hadoop.hbase.client.Connection; 039import org.apache.hadoop.hbase.client.ConnectionFactory; 040import org.apache.hadoop.hbase.client.Durability; 041import org.apache.hadoop.hbase.client.Put; 042import org.apache.hadoop.hbase.client.Result; 043import org.apache.hadoop.hbase.client.ResultScanner; 044import org.apache.hadoop.hbase.client.Scan; 045import org.apache.hadoop.hbase.client.Table; 046import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 047import org.apache.hadoop.hbase.testclassification.LargeTests; 048import org.apache.hadoop.hbase.testclassification.MapReduceTests; 049import org.apache.hadoop.hbase.util.Bytes; 050import org.apache.hadoop.io.MapWritable; 051import org.apache.hadoop.io.Text; 052import org.apache.hadoop.mapreduce.Job; 053import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 054import org.junit.AfterClass; 055import org.junit.Before; 056import org.junit.BeforeClass; 057import org.junit.ClassRule; 058import org.junit.Test; 059import org.junit.experimental.categories.Category; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063@Category({MapReduceTests.class, LargeTests.class}) 064public class TestTimeRangeMapRed { 065 066 @ClassRule 067 public static final HBaseClassTestRule CLASS_RULE = 068 HBaseClassTestRule.forClass(TestTimeRangeMapRed.class); 069 070 private final static Logger log = LoggerFactory.getLogger(TestTimeRangeMapRed.class); 071 private static final HBaseTestingUtility UTIL = 072 new HBaseTestingUtility(); 073 private Admin admin; 074 075 private static final byte [] KEY = Bytes.toBytes("row1"); 076 private static final NavigableMap<Long, Boolean> TIMESTAMP = new TreeMap<>(); 077 static { 078 TIMESTAMP.put((long)1245620000, false); 079 TIMESTAMP.put((long)1245620005, true); // include 080 TIMESTAMP.put((long)1245620010, true); // include 081 TIMESTAMP.put((long)1245620055, true); // include 082 TIMESTAMP.put((long)1245620100, true); // include 083 TIMESTAMP.put((long)1245620150, false); 084 TIMESTAMP.put((long)1245620250, false); 085 } 086 static final long MINSTAMP = 1245620005; 087 static final long MAXSTAMP = 1245620100 + 1; // maxStamp itself is excluded. so increment it. 088 089 static final TableName TABLE_NAME = TableName.valueOf("table123"); 090 static final byte[] FAMILY_NAME = Bytes.toBytes("text"); 091 static final byte[] COLUMN_NAME = Bytes.toBytes("input"); 092 093 @BeforeClass 094 public static void beforeClass() throws Exception { 095 UTIL.startMiniCluster(); 096 } 097 098 @AfterClass 099 public static void afterClass() throws Exception { 100 UTIL.shutdownMiniCluster(); 101 } 102 103 @Before 104 public void before() throws Exception { 105 this.admin = UTIL.getAdmin(); 106 } 107 108 private static class ProcessTimeRangeMapper 109 extends TableMapper<ImmutableBytesWritable, MapWritable> 110 implements Configurable { 111 112 private Configuration conf = null; 113 private Table table = null; 114 115 @Override 116 public void map(ImmutableBytesWritable key, Result result, 117 Context context) 118 throws IOException { 119 List<Long> tsList = new ArrayList<>(); 120 for (Cell kv : result.listCells()) { 121 tsList.add(kv.getTimestamp()); 122 } 123 124 List<Put> puts = new ArrayList<>(); 125 for (Long ts : tsList) { 126 Put put = new Put(key.get()); 127 put.setDurability(Durability.SKIP_WAL); 128 put.addColumn(FAMILY_NAME, COLUMN_NAME, ts, Bytes.toBytes(true)); 129 puts.add(put); 130 } 131 table.put(puts); 132 } 133 134 @Override 135 public Configuration getConf() { 136 return conf; 137 } 138 139 @Override 140 public void setConf(Configuration configuration) { 141 this.conf = configuration; 142 try { 143 Connection connection = ConnectionFactory.createConnection(conf); 144 table = connection.getTable(TABLE_NAME); 145 } catch (IOException e) { 146 e.printStackTrace(); 147 } 148 } 149 } 150 151 @Test 152 public void testTimeRangeMapRed() 153 throws IOException, InterruptedException, ClassNotFoundException { 154 final HTableDescriptor desc = new HTableDescriptor(TABLE_NAME); 155 final HColumnDescriptor col = new HColumnDescriptor(FAMILY_NAME); 156 col.setMaxVersions(Integer.MAX_VALUE); 157 desc.addFamily(col); 158 admin.createTable(desc); 159 List<Put> puts = new ArrayList<>(); 160 for (Map.Entry<Long, Boolean> entry : TIMESTAMP.entrySet()) { 161 Put put = new Put(KEY); 162 put.setDurability(Durability.SKIP_WAL); 163 put.addColumn(FAMILY_NAME, COLUMN_NAME, entry.getKey(), Bytes.toBytes(false)); 164 puts.add(put); 165 } 166 Table table = UTIL.getConnection().getTable(desc.getTableName()); 167 table.put(puts); 168 runTestOnTable(); 169 verify(table); 170 table.close(); 171 } 172 173 private void runTestOnTable() 174 throws IOException, InterruptedException, ClassNotFoundException { 175 Job job = null; 176 try { 177 job = new Job(UTIL.getConfiguration(), "test123"); 178 job.setOutputFormatClass(NullOutputFormat.class); 179 job.setNumReduceTasks(0); 180 Scan scan = new Scan(); 181 scan.addColumn(FAMILY_NAME, COLUMN_NAME); 182 scan.setTimeRange(MINSTAMP, MAXSTAMP); 183 scan.setMaxVersions(); 184 TableMapReduceUtil.initTableMapperJob(TABLE_NAME, 185 scan, ProcessTimeRangeMapper.class, Text.class, Text.class, job); 186 job.waitForCompletion(true); 187 } catch (IOException e) { 188 // TODO Auto-generated catch block 189 e.printStackTrace(); 190 } finally { 191 if (job != null) { 192 FileUtil.fullyDelete( 193 new File(job.getConfiguration().get("hadoop.tmp.dir"))); 194 } 195 } 196 } 197 198 private void verify(final Table table) throws IOException { 199 Scan scan = new Scan(); 200 scan.addColumn(FAMILY_NAME, COLUMN_NAME); 201 scan.setMaxVersions(1); 202 ResultScanner scanner = table.getScanner(scan); 203 for (Result r: scanner) { 204 for (Cell kv : r.listCells()) { 205 log.debug(Bytes.toString(r.getRow()) + "\t" + Bytes.toString(CellUtil.cloneFamily(kv)) 206 + "\t" + Bytes.toString(CellUtil.cloneQualifier(kv)) 207 + "\t" + kv.getTimestamp() + "\t" + Bytes.toBoolean(CellUtil.cloneValue(kv))); 208 org.junit.Assert.assertEquals(TIMESTAMP.get(kv.getTimestamp()), 209 Bytes.toBoolean(CellUtil.cloneValue(kv))); 210 } 211 } 212 scanner.close(); 213 } 214 215} 216