001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022import static org.junit.Assert.fail; 023import static org.mockito.ArgumentMatchers.any; 024import static org.mockito.Mockito.doAnswer; 025import static org.mockito.Mockito.mock; 026import static org.mockito.Mockito.when; 027 028import java.io.ByteArrayOutputStream; 029import java.io.File; 030import java.io.PrintStream; 031import java.util.ArrayList; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.CellUtil; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseTestingUtility; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.KeyValue; 041import org.apache.hadoop.hbase.MiniHBaseCluster; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.client.Delete; 044import org.apache.hadoop.hbase.client.Get; 045import org.apache.hadoop.hbase.client.Put; 046import org.apache.hadoop.hbase.client.Result; 047import org.apache.hadoop.hbase.client.Table; 048import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 049import org.apache.hadoop.hbase.mapreduce.WALPlayer.WALKeyValueMapper; 050import org.apache.hadoop.hbase.regionserver.TestRecoveredEdits; 051import org.apache.hadoop.hbase.testclassification.LargeTests; 052import org.apache.hadoop.hbase.testclassification.MapReduceTests; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.hadoop.hbase.util.CommonFSUtils; 055import org.apache.hadoop.hbase.util.LauncherSecurityManager; 056import org.apache.hadoop.hbase.wal.WAL; 057import org.apache.hadoop.hbase.wal.WALEdit; 058import org.apache.hadoop.hbase.wal.WALKey; 059import org.apache.hadoop.mapreduce.Mapper; 060import org.apache.hadoop.mapreduce.Mapper.Context; 061import org.apache.hadoop.util.ToolRunner; 062import org.junit.AfterClass; 063import org.junit.BeforeClass; 064import org.junit.ClassRule; 065import org.junit.Rule; 066import org.junit.Test; 067import org.junit.experimental.categories.Category; 068import org.junit.rules.TestName; 069import org.mockito.invocation.InvocationOnMock; 070import org.mockito.stubbing.Answer; 071 072/** 073 * Basic test for the WALPlayer M/R tool 074 */ 075@Category({ MapReduceTests.class, LargeTests.class }) 076// TODO : Remove this in 3.0 077public class TestWALPlayer { 078 @ClassRule 079 public static final HBaseClassTestRule CLASS_RULE = 080 HBaseClassTestRule.forClass(TestWALPlayer.class); 081 082 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 083 private static MiniHBaseCluster cluster; 084 private static Path rootDir; 085 private static Path walRootDir; 086 private static FileSystem fs; 087 private static FileSystem logFs; 088 private static Configuration conf; 089 090 @Rule 091 public TestName name = new TestName(); 092 093 @BeforeClass 094 public static void beforeClass() throws Exception { 095 conf = TEST_UTIL.getConfiguration(); 096 rootDir = TEST_UTIL.createRootDir(); 097 walRootDir = TEST_UTIL.createWALRootDir(); 098 fs = CommonFSUtils.getRootDirFileSystem(conf); 099 logFs = CommonFSUtils.getWALFileSystem(conf); 100 cluster = TEST_UTIL.startMiniCluster(); 101 } 102 103 @AfterClass 104 public static void afterClass() throws Exception { 105 TEST_UTIL.shutdownMiniCluster(); 106 fs.delete(rootDir, true); 107 logFs.delete(walRootDir, true); 108 } 109 110 /** 111 * Test that WALPlayer can replay recovered.edits files. 112 */ 113 @Test 114 public void testPlayingRecoveredEdit() throws Exception { 115 TableName tn = TableName.valueOf(TestRecoveredEdits.RECOVEREDEDITS_TABLENAME); 116 TEST_UTIL.createTable(tn, TestRecoveredEdits.RECOVEREDEDITS_COLUMNFAMILY); 117 // Copy testing recovered.edits file that is over under hbase-server test resources 118 // up into a dir in our little hdfs cluster here. 119 String hbaseServerTestResourcesEdits = 120 System.getProperty("test.build.classes") + "/../../../hbase-server/src/test/resources/" 121 + TestRecoveredEdits.RECOVEREDEDITS_PATH.getName(); 122 assertTrue(new File(hbaseServerTestResourcesEdits).exists()); 123 FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem(); 124 // Target dir. 125 Path targetDir = new Path("edits").makeQualified(dfs.getUri(), dfs.getHomeDirectory()); 126 assertTrue(dfs.mkdirs(targetDir)); 127 dfs.copyFromLocalFile(new Path(hbaseServerTestResourcesEdits), targetDir); 128 assertEquals(0, 129 ToolRunner.run(new WALPlayer(this.conf), new String[] { targetDir.toString() })); 130 // I don't know how many edits are in this file for this table... so just check more than 1. 131 assertTrue(TEST_UTIL.countRows(tn) > 0); 132 } 133 134 /** 135 * Simple end-to-end test 136 */ 137 @Test 138 public void testWALPlayer() throws Exception { 139 final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1"); 140 final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2"); 141 final byte[] FAMILY = Bytes.toBytes("family"); 142 final byte[] COLUMN1 = Bytes.toBytes("c1"); 143 final byte[] COLUMN2 = Bytes.toBytes("c2"); 144 final byte[] ROW = Bytes.toBytes("row"); 145 Table t1 = TEST_UTIL.createTable(tableName1, FAMILY); 146 Table t2 = TEST_UTIL.createTable(tableName2, FAMILY); 147 148 // put a row into the first table 149 Put p = new Put(ROW); 150 p.addColumn(FAMILY, COLUMN1, COLUMN1); 151 p.addColumn(FAMILY, COLUMN2, COLUMN2); 152 t1.put(p); 153 // delete one column 154 Delete d = new Delete(ROW); 155 d.addColumns(FAMILY, COLUMN1); 156 t1.delete(d); 157 158 // replay the WAL, map table 1 to table 2 159 WAL log = cluster.getRegionServer(0).getWAL(null); 160 log.rollWriter(); 161 String walInputDir = new Path(cluster.getMaster().getMasterFileSystem().getWALRootDir(), 162 HConstants.HREGION_LOGDIR_NAME).toString(); 163 164 Configuration configuration = TEST_UTIL.getConfiguration(); 165 WALPlayer player = new WALPlayer(configuration); 166 String optionName = "_test_.name"; 167 configuration.set(optionName, "1000"); 168 player.setupTime(configuration, optionName); 169 assertEquals(1000, configuration.getLong(optionName, 0)); 170 assertEquals(0, ToolRunner.run(configuration, player, 171 new String[] { walInputDir, tableName1.getNameAsString(), tableName2.getNameAsString() })); 172 173 // verify the WAL was player into table 2 174 Get g = new Get(ROW); 175 Result r = t2.get(g); 176 assertEquals(1, r.size()); 177 assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN2)); 178 } 179 180 /** 181 * Test WALKeyValueMapper setup and map 182 */ 183 @Test 184 public void testWALKeyValueMapper() throws Exception { 185 testWALKeyValueMapper(WALPlayer.TABLES_KEY); 186 } 187 188 @Test 189 public void testWALKeyValueMapperWithDeprecatedConfig() throws Exception { 190 testWALKeyValueMapper("hlog.input.tables"); 191 } 192 193 private void testWALKeyValueMapper(final String tableConfigKey) throws Exception { 194 Configuration configuration = new Configuration(); 195 configuration.set(tableConfigKey, "table"); 196 WALKeyValueMapper mapper = new WALKeyValueMapper(); 197 WALKey key = mock(WALKey.class); 198 when(key.getTableName()).thenReturn(TableName.valueOf("table")); 199 @SuppressWarnings("unchecked") 200 Mapper<WALKey, WALEdit, ImmutableBytesWritable, KeyValue>.Context context = mock(Context.class); 201 when(context.getConfiguration()).thenReturn(configuration); 202 203 WALEdit value = mock(WALEdit.class); 204 ArrayList<Cell> values = new ArrayList<>(); 205 KeyValue kv1 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), null); 206 207 values.add(kv1); 208 when(value.getCells()).thenReturn(values); 209 mapper.setup(context); 210 211 doAnswer(new Answer<Void>() { 212 213 @Override 214 public Void answer(InvocationOnMock invocation) throws Throwable { 215 ImmutableBytesWritable writer = invocation.getArgument(0); 216 KeyValue key = invocation.getArgument(1); 217 assertEquals("row", Bytes.toString(writer.get())); 218 assertEquals("row", Bytes.toString(CellUtil.cloneRow(key))); 219 return null; 220 } 221 }).when(context).write(any(), any()); 222 223 mapper.map(key, value, context); 224 225 } 226 227 /** 228 * Test main method 229 */ 230 @Test 231 public void testMainMethod() throws Exception { 232 233 PrintStream oldPrintStream = System.err; 234 SecurityManager SECURITY_MANAGER = System.getSecurityManager(); 235 LauncherSecurityManager newSecurityManager = new LauncherSecurityManager(); 236 System.setSecurityManager(newSecurityManager); 237 ByteArrayOutputStream data = new ByteArrayOutputStream(); 238 String[] args = {}; 239 System.setErr(new PrintStream(data)); 240 try { 241 System.setErr(new PrintStream(data)); 242 try { 243 WALPlayer.main(args); 244 fail("should be SecurityException"); 245 } catch (SecurityException e) { 246 assertEquals(-1, newSecurityManager.getExitCode()); 247 assertTrue(data.toString().contains("ERROR: Wrong number of arguments:")); 248 assertTrue(data.toString() 249 .contains("Usage: WALPlayer [options] <WAL inputdir>" + " [<tables> <tableMappings>]")); 250 assertTrue(data.toString().contains("-Dwal.bulk.output=/path/for/output")); 251 } 252 253 } finally { 254 System.setErr(oldPrintStream); 255 System.setSecurityManager(SECURITY_MANAGER); 256 } 257 258 } 259 260}