001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.File; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.Map; 025import java.util.NavigableMap; 026import java.util.TreeMap; 027import org.apache.hadoop.conf.Configurable; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileUtil; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.HBaseTestingUtility; 034import org.apache.hadoop.hbase.HColumnDescriptor; 035import org.apache.hadoop.hbase.HTableDescriptor; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.client.Admin; 038import org.apache.hadoop.hbase.client.Connection; 039import org.apache.hadoop.hbase.client.ConnectionFactory; 040import org.apache.hadoop.hbase.client.Durability; 041import org.apache.hadoop.hbase.client.Put; 042import org.apache.hadoop.hbase.client.Result; 043import org.apache.hadoop.hbase.client.ResultScanner; 044import org.apache.hadoop.hbase.client.Scan; 045import org.apache.hadoop.hbase.client.Table; 046import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 047import org.apache.hadoop.hbase.testclassification.LargeTests; 048import org.apache.hadoop.hbase.testclassification.MapReduceTests; 049import org.apache.hadoop.hbase.util.Bytes; 050import org.apache.hadoop.io.MapWritable; 051import org.apache.hadoop.io.Text; 052import org.apache.hadoop.mapreduce.Job; 053import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 054import org.junit.AfterClass; 055import org.junit.Before; 056import org.junit.BeforeClass; 057import org.junit.ClassRule; 058import org.junit.Test; 059import org.junit.experimental.categories.Category; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063@Category({ MapReduceTests.class, LargeTests.class }) 064public class TestTimeRangeMapRed { 065 066 @ClassRule 067 public static final HBaseClassTestRule CLASS_RULE = 068 HBaseClassTestRule.forClass(TestTimeRangeMapRed.class); 069 070 private final static Logger log = LoggerFactory.getLogger(TestTimeRangeMapRed.class); 071 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 072 private Admin admin; 073 074 private static final byte[] KEY = Bytes.toBytes("row1"); 075 private static final NavigableMap<Long, Boolean> TIMESTAMP = new TreeMap<>(); 076 static { 077 TIMESTAMP.put((long) 1245620000, false); 078 TIMESTAMP.put((long) 1245620005, true); // include 079 TIMESTAMP.put((long) 1245620010, true); // include 080 TIMESTAMP.put((long) 1245620055, true); // include 081 TIMESTAMP.put((long) 1245620100, true); // include 082 TIMESTAMP.put((long) 1245620150, false); 083 TIMESTAMP.put((long) 1245620250, false); 084 } 085 static final long MINSTAMP = 1245620005; 086 static final long MAXSTAMP = 1245620100 + 1; // maxStamp itself is excluded. so increment it. 087 088 static final TableName TABLE_NAME = TableName.valueOf("table123"); 089 static final byte[] FAMILY_NAME = Bytes.toBytes("text"); 090 static final byte[] COLUMN_NAME = Bytes.toBytes("input"); 091 092 @BeforeClass 093 public static void beforeClass() throws Exception { 094 UTIL.startMiniCluster(); 095 } 096 097 @AfterClass 098 public static void afterClass() throws Exception { 099 UTIL.shutdownMiniCluster(); 100 } 101 102 @Before 103 public void before() throws Exception { 104 this.admin = UTIL.getAdmin(); 105 } 106 107 private static class ProcessTimeRangeMapper 108 extends TableMapper<ImmutableBytesWritable, MapWritable> implements Configurable { 109 110 private Configuration conf = null; 111 private Table table = null; 112 113 @Override 114 public void map(ImmutableBytesWritable key, Result result, Context context) throws IOException { 115 List<Long> tsList = new ArrayList<>(); 116 for (Cell kv : result.listCells()) { 117 tsList.add(kv.getTimestamp()); 118 } 119 120 List<Put> puts = new ArrayList<>(); 121 for (Long ts : tsList) { 122 Put put = new Put(key.get()); 123 put.setDurability(Durability.SKIP_WAL); 124 put.addColumn(FAMILY_NAME, COLUMN_NAME, ts, Bytes.toBytes(true)); 125 puts.add(put); 126 } 127 table.put(puts); 128 } 129 130 @Override 131 public Configuration getConf() { 132 return conf; 133 } 134 135 @Override 136 public void setConf(Configuration configuration) { 137 this.conf = configuration; 138 try { 139 Connection connection = ConnectionFactory.createConnection(conf); 140 table = connection.getTable(TABLE_NAME); 141 } catch (IOException e) { 142 e.printStackTrace(); 143 } 144 } 145 } 146 147 @Test 148 public void testTimeRangeMapRed() 149 throws IOException, InterruptedException, ClassNotFoundException { 150 final HTableDescriptor desc = new HTableDescriptor(TABLE_NAME); 151 final HColumnDescriptor col = new HColumnDescriptor(FAMILY_NAME); 152 col.setMaxVersions(Integer.MAX_VALUE); 153 desc.addFamily(col); 154 admin.createTable(desc); 155 List<Put> puts = new ArrayList<>(); 156 for (Map.Entry<Long, Boolean> entry : TIMESTAMP.entrySet()) { 157 Put put = new Put(KEY); 158 put.setDurability(Durability.SKIP_WAL); 159 put.addColumn(FAMILY_NAME, COLUMN_NAME, entry.getKey(), Bytes.toBytes(false)); 160 puts.add(put); 161 } 162 Table table = UTIL.getConnection().getTable(desc.getTableName()); 163 table.put(puts); 164 runTestOnTable(); 165 verify(table); 166 table.close(); 167 } 168 169 private void runTestOnTable() throws IOException, InterruptedException, ClassNotFoundException { 170 Job job = null; 171 try { 172 job = new Job(UTIL.getConfiguration(), "test123"); 173 job.setOutputFormatClass(NullOutputFormat.class); 174 job.setNumReduceTasks(0); 175 Scan scan = new Scan(); 176 scan.addColumn(FAMILY_NAME, COLUMN_NAME); 177 scan.setTimeRange(MINSTAMP, MAXSTAMP); 178 scan.setMaxVersions(); 179 TableMapReduceUtil.initTableMapperJob(TABLE_NAME, scan, ProcessTimeRangeMapper.class, 180 Text.class, Text.class, job); 181 job.waitForCompletion(true); 182 } catch (IOException e) { 183 // TODO Auto-generated catch block 184 e.printStackTrace(); 185 } finally { 186 if (job != null) { 187 FileUtil.fullyDelete(new File(job.getConfiguration().get("hadoop.tmp.dir"))); 188 } 189 } 190 } 191 192 private void verify(final Table table) throws IOException { 193 Scan scan = new Scan(); 194 scan.addColumn(FAMILY_NAME, COLUMN_NAME); 195 scan.setMaxVersions(1); 196 ResultScanner scanner = table.getScanner(scan); 197 for (Result r : scanner) { 198 for (Cell kv : r.listCells()) { 199 log.debug(Bytes.toString(r.getRow()) + "\t" + Bytes.toString(CellUtil.cloneFamily(kv)) 200 + "\t" + Bytes.toString(CellUtil.cloneQualifier(kv)) + "\t" + kv.getTimestamp() + "\t" 201 + Bytes.toBoolean(CellUtil.cloneValue(kv))); 202 org.junit.Assert.assertEquals(TIMESTAMP.get(kv.getTimestamp()), 203 Bytes.toBoolean(CellUtil.cloneValue(kv))); 204 } 205 } 206 scanner.close(); 207 } 208 209}