001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022import static org.junit.Assert.fail; 023import static org.mockito.ArgumentMatchers.any; 024import static org.mockito.Mockito.doAnswer; 025import static org.mockito.Mockito.mock; 026import static org.mockito.Mockito.when; 027 028import java.io.ByteArrayOutputStream; 029import java.io.PrintStream; 030import java.util.ArrayList; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.CellUtil; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseTestingUtility; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.KeyValue; 040import org.apache.hadoop.hbase.MiniHBaseCluster; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.client.Delete; 043import org.apache.hadoop.hbase.client.Get; 044import org.apache.hadoop.hbase.client.Put; 045import org.apache.hadoop.hbase.client.Result; 046import org.apache.hadoop.hbase.client.Table; 047import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 048import org.apache.hadoop.hbase.mapreduce.WALPlayer.WALCellMapper; 049import org.apache.hadoop.hbase.testclassification.LargeTests; 050import org.apache.hadoop.hbase.testclassification.MapReduceTests; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.apache.hadoop.hbase.util.CommonFSUtils; 053import org.apache.hadoop.hbase.util.LauncherSecurityManager; 054import org.apache.hadoop.hbase.util.MapReduceExtendedCell; 055import org.apache.hadoop.hbase.wal.WAL; 056import org.apache.hadoop.hbase.wal.WALEdit; 057import org.apache.hadoop.hbase.wal.WALKey; 058import org.apache.hadoop.mapreduce.Mapper; 059import org.apache.hadoop.mapreduce.Mapper.Context; 060import org.apache.hadoop.util.ToolRunner; 061import org.junit.AfterClass; 062import org.junit.BeforeClass; 063import org.junit.ClassRule; 064import org.junit.Rule; 065import org.junit.Test; 066import org.junit.experimental.categories.Category; 067import org.junit.rules.TestName; 068import org.mockito.invocation.InvocationOnMock; 069import org.mockito.stubbing.Answer; 070 071/** 072 * Basic test for the WALPlayer M/R tool 073 */ 074@Category({ MapReduceTests.class, LargeTests.class }) 075public class TestCellBasedWALPlayer2 { 076 077 @ClassRule 078 public static final HBaseClassTestRule CLASS_RULE = 079 HBaseClassTestRule.forClass(TestCellBasedWALPlayer2.class); 080 081 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 082 private static MiniHBaseCluster cluster; 083 private static Path rootDir; 084 private static Path walRootDir; 085 private static FileSystem fs; 086 private static FileSystem logFs; 087 private static Configuration conf; 088 089 @Rule 090 public TestName name = new TestName(); 091 092 @BeforeClass 093 public static void beforeClass() throws Exception { 094 conf = TEST_UTIL.getConfiguration(); 095 rootDir = TEST_UTIL.createRootDir(); 096 walRootDir = TEST_UTIL.createWALRootDir(); 097 fs = CommonFSUtils.getRootDirFileSystem(conf); 098 logFs = CommonFSUtils.getWALFileSystem(conf); 099 cluster = TEST_UTIL.startMiniCluster(); 100 } 101 102 @AfterClass 103 public static void afterClass() throws Exception { 104 TEST_UTIL.shutdownMiniCluster(); 105 fs.delete(rootDir, true); 106 logFs.delete(walRootDir, true); 107 } 108 109 /** 110 * Simple end-to-end test n 111 */ 112 @Test 113 public void testWALPlayer() throws Exception { 114 final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1"); 115 final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2"); 116 final byte[] FAMILY = Bytes.toBytes("family"); 117 final byte[] COLUMN1 = Bytes.toBytes("c1"); 118 final byte[] COLUMN2 = Bytes.toBytes("c2"); 119 final byte[] ROW = Bytes.toBytes("row"); 120 Table t1 = TEST_UTIL.createTable(tableName1, FAMILY); 121 Table t2 = TEST_UTIL.createTable(tableName2, FAMILY); 122 123 // put a row into the first table 124 Put p = new Put(ROW); 125 p.addColumn(FAMILY, COLUMN1, COLUMN1); 126 p.addColumn(FAMILY, COLUMN2, COLUMN2); 127 t1.put(p); 128 // delete one column 129 Delete d = new Delete(ROW); 130 d.addColumns(FAMILY, COLUMN1); 131 t1.delete(d); 132 133 // replay the WAL, map table 1 to table 2 134 WAL log = cluster.getRegionServer(0).getWAL(null); 135 log.rollWriter(); 136 String walInputDir = new Path(cluster.getMaster().getMasterFileSystem().getWALRootDir(), 137 HConstants.HREGION_LOGDIR_NAME).toString(); 138 139 Configuration configuration = TEST_UTIL.getConfiguration(); 140 WALPlayer player = new WALPlayer(configuration); 141 String optionName = "_test_.name"; 142 configuration.set(optionName, "1000"); 143 player.setupTime(configuration, optionName); 144 assertEquals(1000, configuration.getLong(optionName, 0)); 145 assertEquals(0, ToolRunner.run(configuration, player, 146 new String[] { walInputDir, tableName1.getNameAsString(), tableName2.getNameAsString() })); 147 148 // verify the WAL was player into table 2 149 Get g = new Get(ROW); 150 Result r = t2.get(g); 151 assertEquals(1, r.size()); 152 assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN2)); 153 } 154 155 /** 156 * Test WALKeyValueMapper setup and map 157 */ 158 @Test 159 public void testWALKeyValueMapper() throws Exception { 160 testWALKeyValueMapper(WALPlayer.TABLES_KEY); 161 } 162 163 @Test 164 public void testWALKeyValueMapperWithDeprecatedConfig() throws Exception { 165 testWALKeyValueMapper("hlog.input.tables"); 166 } 167 168 private void testWALKeyValueMapper(final String tableConfigKey) throws Exception { 169 Configuration configuration = new Configuration(); 170 configuration.set(tableConfigKey, "table"); 171 WALCellMapper mapper = new WALCellMapper(); 172 WALKey key = mock(WALKey.class); 173 when(key.getTableName()).thenReturn(TableName.valueOf("table")); 174 @SuppressWarnings("unchecked") 175 Mapper<WALKey, WALEdit, ImmutableBytesWritable, Cell>.Context context = mock(Context.class); 176 when(context.getConfiguration()).thenReturn(configuration); 177 178 WALEdit value = mock(WALEdit.class); 179 ArrayList<Cell> values = new ArrayList<>(); 180 KeyValue kv1 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), null); 181 182 values.add(kv1); 183 when(value.getCells()).thenReturn(values); 184 mapper.setup(context); 185 186 doAnswer(new Answer<Void>() { 187 188 @Override 189 public Void answer(InvocationOnMock invocation) throws Throwable { 190 ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArguments()[0]; 191 MapReduceExtendedCell key = (MapReduceExtendedCell) invocation.getArguments()[1]; 192 assertEquals("row", Bytes.toString(writer.get())); 193 assertEquals("row", Bytes.toString(CellUtil.cloneRow(key))); 194 return null; 195 } 196 }).when(context).write(any(ImmutableBytesWritable.class), any(MapReduceExtendedCell.class)); 197 198 mapper.map(key, value, context); 199 200 } 201 202 /** 203 * Test main method 204 */ 205 @Test 206 public void testMainMethod() throws Exception { 207 208 PrintStream oldPrintStream = System.err; 209 SecurityManager SECURITY_MANAGER = System.getSecurityManager(); 210 LauncherSecurityManager newSecurityManager = new LauncherSecurityManager(); 211 System.setSecurityManager(newSecurityManager); 212 ByteArrayOutputStream data = new ByteArrayOutputStream(); 213 String[] args = {}; 214 System.setErr(new PrintStream(data)); 215 try { 216 System.setErr(new PrintStream(data)); 217 try { 218 WALPlayer.main(args); 219 fail("should be SecurityException"); 220 } catch (SecurityException e) { 221 assertEquals(-1, newSecurityManager.getExitCode()); 222 assertTrue(data.toString().contains("ERROR: Wrong number of arguments:")); 223 assertTrue(data.toString() 224 .contains("Usage: WALPlayer [options] <WAL inputdir>" + " [<tables> <tableMappings>]")); 225 assertTrue(data.toString().contains("-Dwal.bulk.output=/path/for/output")); 226 } 227 228 } finally { 229 System.setErr(oldPrintStream); 230 System.setSecurityManager(SECURITY_MANAGER); 231 } 232 233 } 234 235}