001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022import static org.junit.Assert.fail; 023import static org.mockito.ArgumentMatchers.any; 024import static org.mockito.Mockito.doAnswer; 025import static org.mockito.Mockito.mock; 026import static org.mockito.Mockito.when; 027 028import java.io.ByteArrayOutputStream; 029import java.io.PrintStream; 030import java.util.ArrayList; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.CellUtil; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseTestingUtility; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.KeyValue; 040import org.apache.hadoop.hbase.MiniHBaseCluster; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.client.Delete; 043import org.apache.hadoop.hbase.client.Get; 044import org.apache.hadoop.hbase.client.Put; 045import org.apache.hadoop.hbase.client.Result; 046import org.apache.hadoop.hbase.client.Table; 047import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 048import org.apache.hadoop.hbase.mapreduce.WALPlayer.WALKeyValueMapper; 049import org.apache.hadoop.hbase.testclassification.LargeTests; 050import org.apache.hadoop.hbase.testclassification.MapReduceTests; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.apache.hadoop.hbase.util.FSUtils; 053import org.apache.hadoop.hbase.util.LauncherSecurityManager; 054import org.apache.hadoop.hbase.wal.WAL; 055import org.apache.hadoop.hbase.wal.WALEdit; 056import org.apache.hadoop.hbase.wal.WALKey; 057import org.apache.hadoop.mapreduce.Mapper; 058import org.apache.hadoop.mapreduce.Mapper.Context; 059import org.apache.hadoop.util.ToolRunner; 060import org.junit.AfterClass; 061import org.junit.BeforeClass; 062import org.junit.ClassRule; 063import org.junit.Rule; 064import org.junit.Test; 065import org.junit.experimental.categories.Category; 066import org.junit.rules.TestName; 067import org.mockito.invocation.InvocationOnMock; 068import org.mockito.stubbing.Answer; 069 070/** 071 * Basic test for the WALPlayer M/R tool 072 */ 073@Category({MapReduceTests.class, LargeTests.class}) 074//TODO : Remove this in 3.0 075public class TestWALPlayer { 076 077 @ClassRule 078 public static final HBaseClassTestRule CLASS_RULE = 079 HBaseClassTestRule.forClass(TestWALPlayer.class); 080 081 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 082 private static MiniHBaseCluster cluster; 083 private static Path rootDir; 084 private static Path walRootDir; 085 private static FileSystem fs; 086 private static FileSystem logFs; 087 private static Configuration conf; 088 089 @Rule 090 public TestName name = new TestName(); 091 092 @BeforeClass 093 public static void beforeClass() throws Exception { 094 conf= TEST_UTIL.getConfiguration(); 095 rootDir = TEST_UTIL.createRootDir(); 096 walRootDir = TEST_UTIL.createWALRootDir(); 097 fs = FSUtils.getRootDirFileSystem(conf); 098 logFs = FSUtils.getWALFileSystem(conf); 099 cluster = TEST_UTIL.startMiniCluster(); 100 } 101 102 @AfterClass 103 public static void afterClass() throws Exception { 104 TEST_UTIL.shutdownMiniCluster(); 105 fs.delete(rootDir, true); 106 logFs.delete(walRootDir, true); 107 } 108 109 /** 110 * Simple end-to-end test 111 * @throws Exception 112 */ 113 @Test 114 public void testWALPlayer() throws Exception { 115 final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1"); 116 final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2"); 117 final byte[] FAMILY = Bytes.toBytes("family"); 118 final byte[] COLUMN1 = Bytes.toBytes("c1"); 119 final byte[] COLUMN2 = Bytes.toBytes("c2"); 120 final byte[] ROW = Bytes.toBytes("row"); 121 Table t1 = TEST_UTIL.createTable(tableName1, FAMILY); 122 Table t2 = TEST_UTIL.createTable(tableName2, FAMILY); 123 124 // put a row into the first table 125 Put p = new Put(ROW); 126 p.addColumn(FAMILY, COLUMN1, COLUMN1); 127 p.addColumn(FAMILY, COLUMN2, COLUMN2); 128 t1.put(p); 129 // delete one column 130 Delete d = new Delete(ROW); 131 d.addColumns(FAMILY, COLUMN1); 132 t1.delete(d); 133 134 // replay the WAL, map table 1 to table 2 135 WAL log = cluster.getRegionServer(0).getWAL(null); 136 log.rollWriter(); 137 String walInputDir = new Path(cluster.getMaster().getMasterFileSystem() 138 .getWALRootDir(), HConstants.HREGION_LOGDIR_NAME).toString(); 139 140 Configuration configuration= TEST_UTIL.getConfiguration(); 141 WALPlayer player = new WALPlayer(configuration); 142 String optionName="_test_.name"; 143 configuration.set(optionName, "1000"); 144 player.setupTime(configuration, optionName); 145 assertEquals(1000,configuration.getLong(optionName,0)); 146 assertEquals(0, ToolRunner.run(configuration, player, 147 new String[] {walInputDir, tableName1.getNameAsString(), 148 tableName2.getNameAsString() })); 149 150 151 // verify the WAL was player into table 2 152 Get g = new Get(ROW); 153 Result r = t2.get(g); 154 assertEquals(1, r.size()); 155 assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN2)); 156 } 157 158 /** 159 * Test WALKeyValueMapper setup and map 160 */ 161 @Test 162 public void testWALKeyValueMapper() throws Exception { 163 testWALKeyValueMapper(WALPlayer.TABLES_KEY); 164 } 165 166 @Test 167 public void testWALKeyValueMapperWithDeprecatedConfig() throws Exception { 168 testWALKeyValueMapper("hlog.input.tables"); 169 } 170 171 private void testWALKeyValueMapper(final String tableConfigKey) throws Exception { 172 Configuration configuration = new Configuration(); 173 configuration.set(tableConfigKey, "table"); 174 WALKeyValueMapper mapper = new WALKeyValueMapper(); 175 WALKey key = mock(WALKey.class); 176 when(key.getTableName()).thenReturn(TableName.valueOf("table")); 177 @SuppressWarnings("unchecked") 178 Mapper<WALKey, WALEdit, ImmutableBytesWritable, KeyValue>.Context context = mock(Context.class); 179 when(context.getConfiguration()).thenReturn(configuration); 180 181 WALEdit value = mock(WALEdit.class); 182 ArrayList<Cell> values = new ArrayList<>(); 183 KeyValue kv1 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), null); 184 185 values.add(kv1); 186 when(value.getCells()).thenReturn(values); 187 mapper.setup(context); 188 189 doAnswer(new Answer<Void>() { 190 191 @Override 192 public Void answer(InvocationOnMock invocation) throws Throwable { 193 ImmutableBytesWritable writer = invocation.getArgument(0); 194 KeyValue key = invocation.getArgument(1); 195 assertEquals("row", Bytes.toString(writer.get())); 196 assertEquals("row", Bytes.toString(CellUtil.cloneRow(key))); 197 return null; 198 } 199 }).when(context).write(any(), any()); 200 201 mapper.map(key, value, context); 202 203 } 204 205 /** 206 * Test main method 207 */ 208 @Test 209 public void testMainMethod() throws Exception { 210 211 PrintStream oldPrintStream = System.err; 212 SecurityManager SECURITY_MANAGER = System.getSecurityManager(); 213 LauncherSecurityManager newSecurityManager= new LauncherSecurityManager(); 214 System.setSecurityManager(newSecurityManager); 215 ByteArrayOutputStream data = new ByteArrayOutputStream(); 216 String[] args = {}; 217 System.setErr(new PrintStream(data)); 218 try { 219 System.setErr(new PrintStream(data)); 220 try { 221 WALPlayer.main(args); 222 fail("should be SecurityException"); 223 } catch (SecurityException e) { 224 assertEquals(-1, newSecurityManager.getExitCode()); 225 assertTrue(data.toString().contains("ERROR: Wrong number of arguments:")); 226 assertTrue(data.toString().contains("Usage: WALPlayer [options] <wal inputdir>" + 227 " <tables> [<tableMappings>]")); 228 assertTrue(data.toString().contains("-Dwal.bulk.output=/path/for/output")); 229 } 230 231 } finally { 232 System.setErr(oldPrintStream); 233 System.setSecurityManager(SECURITY_MANAGER); 234 } 235 236 } 237 238}