001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021 022import java.io.File; 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.List; 026import java.util.Map; 027import java.util.NavigableMap; 028import java.util.TreeMap; 029import org.apache.hadoop.conf.Configurable; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileUtil; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.CellUtil; 034import org.apache.hadoop.hbase.HBaseTestingUtil; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.Admin; 037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 038import org.apache.hadoop.hbase.client.Connection; 039import org.apache.hadoop.hbase.client.ConnectionFactory; 040import org.apache.hadoop.hbase.client.Durability; 041import org.apache.hadoop.hbase.client.Put; 042import org.apache.hadoop.hbase.client.Result; 043import org.apache.hadoop.hbase.client.ResultScanner; 044import org.apache.hadoop.hbase.client.Scan; 045import org.apache.hadoop.hbase.client.Table; 046import org.apache.hadoop.hbase.client.TableDescriptor; 047import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 048import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 049import org.apache.hadoop.hbase.testclassification.LargeTests; 050import org.apache.hadoop.hbase.testclassification.MapReduceTests; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.apache.hadoop.io.MapWritable; 053import org.apache.hadoop.io.Text; 054import org.apache.hadoop.mapreduce.Job; 055import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 056import org.junit.jupiter.api.AfterAll; 057import org.junit.jupiter.api.BeforeAll; 058import org.junit.jupiter.api.BeforeEach; 059import org.junit.jupiter.api.Tag; 060import org.junit.jupiter.api.Test; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064@Tag(MapReduceTests.TAG) 065@Tag(LargeTests.TAG) 066public class TestTimeRangeMapRed { 067 068 private final static Logger log = LoggerFactory.getLogger(TestTimeRangeMapRed.class); 069 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 070 private Admin admin; 071 072 private static final byte[] KEY = Bytes.toBytes("row1"); 073 private static final NavigableMap<Long, Boolean> TIMESTAMP = new TreeMap<>(); 074 static { 075 TIMESTAMP.put((long) 1245620000, false); 076 TIMESTAMP.put((long) 1245620005, true); // include 077 TIMESTAMP.put((long) 1245620010, true); // include 078 TIMESTAMP.put((long) 1245620055, true); // include 079 TIMESTAMP.put((long) 1245620100, true); // include 080 TIMESTAMP.put((long) 1245620150, false); 081 TIMESTAMP.put((long) 1245620250, false); 082 } 083 static final long MINSTAMP = 1245620005; 084 static final long MAXSTAMP = 1245620100 + 1; // maxStamp itself is excluded. so increment it. 085 086 static final TableName TABLE_NAME = TableName.valueOf("table123"); 087 static final byte[] FAMILY_NAME = Bytes.toBytes("text"); 088 static final byte[] COLUMN_NAME = Bytes.toBytes("input"); 089 090 @BeforeAll 091 public static void beforeClass() throws Exception { 092 UTIL.startMiniCluster(); 093 } 094 095 @AfterAll 096 public static void afterClass() throws Exception { 097 UTIL.shutdownMiniCluster(); 098 } 099 100 @BeforeEach 101 public void before() throws Exception { 102 this.admin = UTIL.getAdmin(); 103 } 104 105 private static class ProcessTimeRangeMapper 106 extends TableMapper<ImmutableBytesWritable, MapWritable> implements Configurable { 107 108 private Configuration conf = null; 109 private Table table = null; 110 111 @Override 112 public void map(ImmutableBytesWritable key, Result result, Context context) throws IOException { 113 List<Long> tsList = new ArrayList<>(); 114 for (Cell kv : result.listCells()) { 115 tsList.add(kv.getTimestamp()); 116 } 117 118 List<Put> puts = new ArrayList<>(); 119 for (Long ts : tsList) { 120 Put put = new Put(key.get()); 121 put.setDurability(Durability.SKIP_WAL); 122 put.addColumn(FAMILY_NAME, COLUMN_NAME, ts, Bytes.toBytes(true)); 123 puts.add(put); 124 } 125 table.put(puts); 126 } 127 128 @Override 129 public Configuration getConf() { 130 return conf; 131 } 132 133 @Override 134 public void setConf(Configuration configuration) { 135 this.conf = configuration; 136 try { 137 Connection connection = ConnectionFactory.createConnection(conf); 138 table = connection.getTable(TABLE_NAME); 139 } catch (IOException e) { 140 e.printStackTrace(); 141 } 142 } 143 } 144 145 @Test 146 public void testTimeRangeMapRed() 147 throws IOException, InterruptedException, ClassNotFoundException { 148 final TableDescriptor tableDescriptor = 149 TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily(ColumnFamilyDescriptorBuilder 150 .newBuilder(FAMILY_NAME).setMaxVersions(Integer.MAX_VALUE).build()).build(); 151 admin.createTable(tableDescriptor); 152 List<Put> puts = new ArrayList<>(); 153 for (Map.Entry<Long, Boolean> entry : TIMESTAMP.entrySet()) { 154 Put put = new Put(KEY); 155 put.setDurability(Durability.SKIP_WAL); 156 put.addColumn(FAMILY_NAME, COLUMN_NAME, entry.getKey(), Bytes.toBytes(false)); 157 puts.add(put); 158 } 159 Table table = UTIL.getConnection().getTable(tableDescriptor.getTableName()); 160 table.put(puts); 161 runTestOnTable(); 162 verify(table); 163 table.close(); 164 } 165 166 private void runTestOnTable() throws IOException, InterruptedException, ClassNotFoundException { 167 Job job = null; 168 try { 169 job = new Job(UTIL.getConfiguration(), "test123"); 170 job.setOutputFormatClass(NullOutputFormat.class); 171 job.setNumReduceTasks(0); 172 Scan scan = new Scan(); 173 scan.addColumn(FAMILY_NAME, COLUMN_NAME); 174 scan.setTimeRange(MINSTAMP, MAXSTAMP); 175 scan.readAllVersions(); 176 TableMapReduceUtil.initTableMapperJob(TABLE_NAME, scan, ProcessTimeRangeMapper.class, 177 Text.class, Text.class, job); 178 job.waitForCompletion(true); 179 } catch (IOException e) { 180 // TODO Auto-generated catch block 181 e.printStackTrace(); 182 } finally { 183 if (job != null) { 184 FileUtil.fullyDelete(new File(job.getConfiguration().get("hadoop.tmp.dir"))); 185 } 186 } 187 } 188 189 private void verify(final Table table) throws IOException { 190 Scan scan = new Scan(); 191 scan.addColumn(FAMILY_NAME, COLUMN_NAME); 192 scan.readVersions(1); 193 ResultScanner scanner = table.getScanner(scan); 194 for (Result r : scanner) { 195 for (Cell kv : r.listCells()) { 196 log.debug(Bytes.toString(r.getRow()) + "\t" + Bytes.toString(CellUtil.cloneFamily(kv)) 197 + "\t" + Bytes.toString(CellUtil.cloneQualifier(kv)) + "\t" + kv.getTimestamp() + "\t" 198 + Bytes.toBoolean(CellUtil.cloneValue(kv))); 199 assertEquals(TIMESTAMP.get(kv.getTimestamp()), Bytes.toBoolean(CellUtil.cloneValue(kv))); 200 } 201 } 202 scanner.close(); 203 } 204 205}