001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.hamcrest.CoreMatchers.equalTo; 021import static org.hamcrest.CoreMatchers.notNullValue; 022import static org.hamcrest.CoreMatchers.nullValue; 023import static org.hamcrest.MatcherAssert.assertThat; 024import static org.junit.jupiter.api.Assertions.assertEquals; 025import static org.junit.jupiter.api.Assertions.assertTrue; 026import static org.junit.jupiter.api.Assertions.fail; 027import static org.mockito.ArgumentMatchers.any; 028import static org.mockito.Mockito.doAnswer; 029import static org.mockito.Mockito.mock; 030import static org.mockito.Mockito.when; 031 032import java.io.ByteArrayOutputStream; 033import java.io.File; 034import java.io.PrintStream; 035import java.util.ArrayList; 036import java.util.concurrent.ThreadLocalRandom; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.fs.FileSystem; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.hbase.Cell; 041import org.apache.hadoop.hbase.CellUtil; 042import org.apache.hadoop.hbase.HBaseTestingUtil; 043import org.apache.hadoop.hbase.HConstants; 044import org.apache.hadoop.hbase.KeyValue; 045import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.client.Delete; 048import org.apache.hadoop.hbase.client.Get; 049import org.apache.hadoop.hbase.client.Put; 050import org.apache.hadoop.hbase.client.Result; 051import org.apache.hadoop.hbase.client.Table; 052import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 053import org.apache.hadoop.hbase.mapreduce.WALPlayer.WALKeyValueMapper; 054import org.apache.hadoop.hbase.regionserver.TestRecoveredEdits; 055import org.apache.hadoop.hbase.testclassification.LargeTests; 056import org.apache.hadoop.hbase.testclassification.MapReduceTests; 057import org.apache.hadoop.hbase.tool.BulkLoadHFiles; 058import org.apache.hadoop.hbase.util.Bytes; 059import org.apache.hadoop.hbase.util.CommonFSUtils; 060import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 061import org.apache.hadoop.hbase.util.LauncherSecurityManager; 062import org.apache.hadoop.hbase.util.MapReduceExtendedCell; 063import org.apache.hadoop.hbase.wal.WAL; 064import org.apache.hadoop.hbase.wal.WALEdit; 065import org.apache.hadoop.hbase.wal.WALKey; 066import org.apache.hadoop.io.WritableComparable; 067import org.apache.hadoop.mapreduce.Mapper; 068import org.apache.hadoop.mapreduce.Mapper.Context; 069import org.apache.hadoop.util.ToolRunner; 070import org.junit.jupiter.api.AfterAll; 071import org.junit.jupiter.api.BeforeAll; 072import org.junit.jupiter.api.Tag; 073import org.junit.jupiter.api.Test; 074import org.junit.jupiter.api.TestInfo; 075import org.mockito.invocation.InvocationOnMock; 076import org.mockito.stubbing.Answer; 077 078/** 079 * Basic test for the WALPlayer M/R tool 080 */ 081@Tag(MapReduceTests.TAG) 082@Tag(LargeTests.TAG) 083public class TestWALPlayer { 084 085 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 086 private static SingleProcessHBaseCluster cluster; 087 private static Path rootDir; 088 private static Path walRootDir; 089 private static FileSystem fs; 090 private static FileSystem logFs; 091 private static Configuration conf; 092 093 @BeforeAll 094 public static void beforeClass() throws Exception { 095 conf = TEST_UTIL.getConfiguration(); 096 rootDir = TEST_UTIL.createRootDir(); 097 walRootDir = TEST_UTIL.createWALRootDir(); 098 fs = CommonFSUtils.getRootDirFileSystem(conf); 099 logFs = CommonFSUtils.getWALFileSystem(conf); 100 cluster = TEST_UTIL.startMiniCluster(); 101 } 102 103 @AfterAll 104 public static void afterClass() throws Exception { 105 TEST_UTIL.shutdownMiniCluster(); 106 fs.delete(rootDir, true); 107 logFs.delete(walRootDir, true); 108 } 109 110 /** 111 * Test that WALPlayer can replay recovered.edits files. 112 */ 113 @Test 114 public void testPlayingRecoveredEdit() throws Exception { 115 TableName tn = TableName.valueOf(TestRecoveredEdits.RECOVEREDEDITS_TABLENAME); 116 TEST_UTIL.createTable(tn, TestRecoveredEdits.RECOVEREDEDITS_COLUMNFAMILY); 117 // Copy testing recovered.edits file that is over under hbase-server test resources 118 // up into a dir in our little hdfs cluster here. 119 runWithDiskBasedSortingDisabledAndEnabled(() -> { 120 String hbaseServerTestResourcesEdits = 121 System.getProperty("test.build.classes") + "/../../../hbase-server/src/test/resources/" 122 + TestRecoveredEdits.RECOVEREDEDITS_PATH.getName(); 123 assertTrue(new File(hbaseServerTestResourcesEdits).exists()); 124 FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem(); 125 // Target dir. 126 Path targetDir = new Path("edits").makeQualified(dfs.getUri(), dfs.getHomeDirectory()); 127 assertTrue(dfs.mkdirs(targetDir)); 128 dfs.copyFromLocalFile(new Path(hbaseServerTestResourcesEdits), targetDir); 129 assertEquals(0, 130 ToolRunner.run(new WALPlayer(this.conf), new String[] { targetDir.toString() })); 131 // I don't know how many edits are in this file for this table... so just check more than 1. 132 assertTrue(TEST_UTIL.countRows(tn) > 0); 133 dfs.delete(targetDir, true); 134 }); 135 } 136 137 /** 138 * Tests that when you write multiple cells with the same timestamp they are properly sorted by 139 * their sequenceId in WALPlayer/CellSortReducer so that the correct one wins when querying from 140 * the resulting bulkloaded HFiles. See HBASE-27649 141 */ 142 @Test 143 public void testWALPlayerBulkLoadWithOverriddenTimestamps(TestInfo testInfo) throws Exception { 144 final TableName tableName = TableName.valueOf(testInfo.getTestMethod().get().getName() + "1"); 145 final byte[] family = Bytes.toBytes("family"); 146 final byte[] column1 = Bytes.toBytes("c1"); 147 final byte[] column2 = Bytes.toBytes("c2"); 148 final byte[] row = Bytes.toBytes("row"); 149 final Table table = TEST_UTIL.createTable(tableName, family); 150 151 long now = EnvironmentEdgeManager.currentTime(); 152 // put a row into the first table 153 Put p = new Put(row); 154 p.addColumn(family, column1, now, column1); 155 p.addColumn(family, column2, now, column2); 156 157 table.put(p); 158 159 byte[] lastVal = null; 160 161 for (int i = 0; i < 50; i++) { 162 lastVal = Bytes.toBytes(ThreadLocalRandom.current().nextLong()); 163 p = new Put(row); 164 p.addColumn(family, column1, now, lastVal); 165 166 table.put(p); 167 168 // wal rolling is necessary to trigger the bug. otherwise no sorting 169 // needs to occur in the reducer because it's all sorted and coming from a single file. 170 if (i % 10 == 0) { 171 WAL log = cluster.getRegionServer(0).getWAL(null); 172 log.rollWriter(); 173 } 174 } 175 176 WAL log = cluster.getRegionServer(0).getWAL(null); 177 log.rollWriter(); 178 String walInputDir = new Path(cluster.getMaster().getMasterFileSystem().getWALRootDir(), 179 HConstants.HREGION_LOGDIR_NAME).toString(); 180 181 Configuration configuration = new Configuration(TEST_UTIL.getConfiguration()); 182 String outPath = "/tmp/" + testInfo.getTestMethod().get().getName(); 183 configuration.set(WALPlayer.BULK_OUTPUT_CONF_KEY, outPath); 184 configuration.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true); 185 186 WALPlayer player = new WALPlayer(configuration); 187 final byte[] finalLastVal = lastVal; 188 189 runWithDiskBasedSortingDisabledAndEnabled(() -> { 190 assertEquals(0, ToolRunner.run(configuration, player, 191 new String[] { walInputDir, tableName.getNameAsString() })); 192 193 Get g = new Get(row); 194 Result result = table.get(g); 195 byte[] value = CellUtil.cloneValue(result.getColumnLatestCell(family, column1)); 196 assertThat(Bytes.toStringBinary(value), equalTo(Bytes.toStringBinary(finalLastVal))); 197 198 TEST_UTIL.truncateTable(tableName); 199 g = new Get(row); 200 result = table.get(g); 201 assertThat(result.listCells(), nullValue()); 202 203 BulkLoadHFiles.create(configuration).bulkLoad(tableName, 204 new Path(outPath, tableName.getNamespaceAsString() + "/" + tableName.getNameAsString())); 205 206 g = new Get(row); 207 result = table.get(g); 208 value = CellUtil.cloneValue(result.getColumnLatestCell(family, column1)); 209 210 assertThat(result.listCells(), notNullValue()); 211 assertThat(Bytes.toStringBinary(value), equalTo(Bytes.toStringBinary(finalLastVal))); 212 213 // cleanup 214 Path out = new Path(outPath); 215 FileSystem fs = out.getFileSystem(configuration); 216 assertTrue(fs.delete(out, true)); 217 }); 218 } 219 220 /** 221 * Simple end-to-end test 222 */ 223 @Test 224 public void testWALPlayer(TestInfo testInfo) throws Exception { 225 final TableName tableName1 = TableName.valueOf(testInfo.getTestMethod().get().getName() + "1"); 226 final TableName tableName2 = TableName.valueOf(testInfo.getTestMethod().get().getName() + "2"); 227 final byte[] FAMILY = Bytes.toBytes("family"); 228 final byte[] COLUMN1 = Bytes.toBytes("c1"); 229 final byte[] COLUMN2 = Bytes.toBytes("c2"); 230 final byte[] ROW = Bytes.toBytes("row"); 231 Table t1 = TEST_UTIL.createTable(tableName1, FAMILY); 232 Table t2 = TEST_UTIL.createTable(tableName2, FAMILY); 233 234 // put a row into the first table 235 Put p = new Put(ROW); 236 p.addColumn(FAMILY, COLUMN1, COLUMN1); 237 p.addColumn(FAMILY, COLUMN2, COLUMN2); 238 t1.put(p); 239 // delete one column 240 Delete d = new Delete(ROW); 241 d.addColumns(FAMILY, COLUMN1); 242 t1.delete(d); 243 244 // replay the WAL, map table 1 to table 2 245 WAL log = cluster.getRegionServer(0).getWAL(null); 246 log.rollWriter(); 247 String walInputDir = new Path(cluster.getMaster().getMasterFileSystem().getWALRootDir(), 248 HConstants.HREGION_LOGDIR_NAME).toString(); 249 250 Configuration configuration = TEST_UTIL.getConfiguration(); 251 WALPlayer player = new WALPlayer(configuration); 252 253 runWithDiskBasedSortingDisabledAndEnabled(() -> { 254 String optionName = "_test_.name"; 255 configuration.set(optionName, "1000"); 256 player.setupTime(configuration, optionName); 257 assertEquals(1000, configuration.getLong(optionName, 0)); 258 assertEquals(0, ToolRunner.run(configuration, player, 259 new String[] { walInputDir, tableName1.getNameAsString(), tableName2.getNameAsString() })); 260 261 // verify the WAL was player into table 2 262 Get g = new Get(ROW); 263 Result r = t2.get(g); 264 assertEquals(1, r.size()); 265 assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN2)); 266 }); 267 } 268 269 /** 270 * Test WALKeyValueMapper setup and map 271 */ 272 @Test 273 public void testWALKeyValueMapper() throws Exception { 274 testWALKeyValueMapper(WALPlayer.TABLES_KEY); 275 } 276 277 @Test 278 public void testWALKeyValueMapperWithDeprecatedConfig() throws Exception { 279 testWALKeyValueMapper("hlog.input.tables"); 280 } 281 282 private void testWALKeyValueMapper(final String tableConfigKey) throws Exception { 283 Configuration configuration = new Configuration(); 284 configuration.set(tableConfigKey, "table"); 285 WALKeyValueMapper mapper = new WALKeyValueMapper(); 286 WALKey key = mock(WALKey.class); 287 when(key.getTableName()).thenReturn(TableName.valueOf("table")); 288 @SuppressWarnings("unchecked") 289 Mapper<WALKey, WALEdit, WritableComparable<?>, Cell>.Context context = mock(Context.class); 290 when(context.getConfiguration()).thenReturn(configuration); 291 292 WALEdit value = mock(WALEdit.class); 293 ArrayList<Cell> values = new ArrayList<>(); 294 KeyValue kv1 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), null); 295 296 values.add(kv1); 297 when(value.getCells()).thenReturn(values); 298 mapper.setup(context); 299 300 doAnswer(new Answer<Void>() { 301 302 @Override 303 public Void answer(InvocationOnMock invocation) throws Throwable { 304 ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArgument(0); 305 MapReduceExtendedCell key = (MapReduceExtendedCell) invocation.getArgument(1); 306 assertEquals("row", Bytes.toString(writer.get())); 307 assertEquals("row", Bytes.toString(CellUtil.cloneRow(key))); 308 return null; 309 } 310 }).when(context).write(any(), any()); 311 312 mapper.map(key, value, context); 313 314 } 315 316 /** 317 * Test main method 318 */ 319 @Test 320 public void testMainMethod() throws Exception { 321 322 PrintStream oldPrintStream = System.err; 323 SecurityManager SECURITY_MANAGER = System.getSecurityManager(); 324 LauncherSecurityManager newSecurityManager = new LauncherSecurityManager(); 325 System.setSecurityManager(newSecurityManager); 326 ByteArrayOutputStream data = new ByteArrayOutputStream(); 327 String[] args = {}; 328 System.setErr(new PrintStream(data)); 329 try { 330 System.setErr(new PrintStream(data)); 331 try { 332 WALPlayer.main(args); 333 fail("should be SecurityException"); 334 } catch (SecurityException e) { 335 assertEquals(-1, newSecurityManager.getExitCode()); 336 assertTrue(data.toString().contains("ERROR: Wrong number of arguments:")); 337 assertTrue(data.toString() 338 .contains("Usage: WALPlayer [options] <WAL inputdir>" + " [<tables> <tableMappings>]")); 339 assertTrue(data.toString().contains("-Dwal.bulk.output=/path/for/output")); 340 } 341 342 } finally { 343 System.setErr(oldPrintStream); 344 System.setSecurityManager(SECURITY_MANAGER); 345 } 346 } 347 348 private static void runWithDiskBasedSortingDisabledAndEnabled(TestMethod method) 349 throws Exception { 350 TEST_UTIL.getConfiguration().setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, 351 false); 352 try { 353 method.run(); 354 } finally { 355 TEST_UTIL.getConfiguration().unset(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY); 356 } 357 358 TEST_UTIL.getConfiguration().setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, 359 true); 360 try { 361 method.run(); 362 } finally { 363 TEST_UTIL.getConfiguration().unset(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY); 364 } 365 } 366 367 private interface TestMethod { 368 void run() throws Exception; 369 } 370 371}