001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertNotNull; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024 025import java.io.IOException; 026import java.security.PrivilegedExceptionAction; 027import java.util.Arrays; 028import java.util.List; 029import java.util.Map; 030import java.util.NavigableMap; 031import java.util.TreeMap; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.CellUtil; 037import org.apache.hadoop.hbase.Coprocessor; 038import org.apache.hadoop.hbase.HBaseConfiguration; 039import org.apache.hadoop.hbase.HBaseTestingUtil; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.KeyValue; 042import org.apache.hadoop.hbase.ServerName; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 045import org.apache.hadoop.hbase.client.Put; 046import org.apache.hadoop.hbase.client.RegionInfo; 047import org.apache.hadoop.hbase.client.RegionInfoBuilder; 048import org.apache.hadoop.hbase.client.TableDescriptor; 049import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 050import org.apache.hadoop.hbase.regionserver.HRegion; 051import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 052import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost; 053import org.apache.hadoop.hbase.security.User; 054import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 055import org.apache.hadoop.hbase.testclassification.MediumTests; 056import org.apache.hadoop.hbase.util.Bytes; 057import org.apache.hadoop.hbase.util.CommonFSUtils; 058import org.apache.hadoop.hbase.util.EnvironmentEdge; 059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 060import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 061import org.apache.hadoop.hbase.wal.WAL; 062import org.apache.hadoop.hbase.wal.WALEdit; 063import org.apache.hadoop.hbase.wal.WALEditInternalHelper; 064import org.apache.hadoop.hbase.wal.WALFactory; 065import org.apache.hadoop.hbase.wal.WALKeyImpl; 066import org.apache.hadoop.hbase.wal.WALSplitter; 067import org.junit.jupiter.api.AfterAll; 068import org.junit.jupiter.api.AfterEach; 069import org.junit.jupiter.api.BeforeAll; 070import org.junit.jupiter.api.BeforeEach; 071import org.junit.jupiter.api.Tag; 072import org.junit.jupiter.api.Test; 073import org.junit.jupiter.api.TestInfo; 074import org.slf4j.Logger; 075import org.slf4j.LoggerFactory; 076 077/** 078 * Tests invocation of the {@link org.apache.hadoop.hbase.coprocessor.MasterObserver} interface 079 * hooks at all appropriate times during normal HMaster operations. 080 */ 081@Tag(CoprocessorTests.TAG) 082@Tag(MediumTests.TAG) 083public class TestWALObserver { 084 085 private static final Logger LOG = LoggerFactory.getLogger(TestWALObserver.class); 086 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 087 088 private static byte[] TEST_TABLE = Bytes.toBytes("observedTable"); 089 private static byte[][] TEST_FAMILY = 090 { Bytes.toBytes("fam1"), Bytes.toBytes("fam2"), Bytes.toBytes("fam3"), }; 091 private static byte[][] TEST_QUALIFIER = 092 { Bytes.toBytes("q1"), Bytes.toBytes("q2"), Bytes.toBytes("q3"), }; 093 private static byte[][] TEST_VALUE = 094 { Bytes.toBytes("v1"), Bytes.toBytes("v2"), Bytes.toBytes("v3"), }; 095 private static byte[] TEST_ROW = Bytes.toBytes("testRow"); 096 097 private String currentTestName; 098 099 private Configuration conf; 100 private FileSystem fs; 101 private Path hbaseRootDir; 102 private Path hbaseWALRootDir; 103 private Path oldLogDir; 104 private Path logDir; 105 private WALFactory wals; 106 107 @BeforeAll 108 public static void setupBeforeClass() throws Exception { 109 Configuration conf = TEST_UTIL.getConfiguration(); 110 conf.setStrings(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, 111 SampleRegionWALCoprocessor.class.getName()); 112 conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 113 SampleRegionWALCoprocessor.class.getName()); 114 conf.setInt("dfs.client.block.recovery.retries", 2); 115 116 TEST_UTIL.startMiniCluster(1); 117 Path hbaseRootDir = TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase")); 118 Path hbaseWALRootDir = 119 TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbaseLogRoot")); 120 LOG.info("hbase.rootdir=" + hbaseRootDir); 121 CommonFSUtils.setRootDir(conf, hbaseRootDir); 122 CommonFSUtils.setWALRootDir(conf, hbaseWALRootDir); 123 } 124 125 @AfterAll 126 public static void teardownAfterClass() throws Exception { 127 TEST_UTIL.shutdownMiniCluster(); 128 } 129 130 @BeforeEach 131 public void setUp(TestInfo testInfo) throws Exception { 132 currentTestName = testInfo.getTestMethod().get().getName(); 133 this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 134 // this.cluster = TEST_UTIL.getDFSCluster(); 135 this.fs = TEST_UTIL.getDFSCluster().getFileSystem(); 136 this.hbaseRootDir = CommonFSUtils.getRootDir(conf); 137 this.hbaseWALRootDir = CommonFSUtils.getWALRootDir(conf); 138 this.oldLogDir = new Path(this.hbaseWALRootDir, HConstants.HREGION_OLDLOGDIR_NAME); 139 String serverName = 140 ServerName.valueOf(currentTestName, 16010, EnvironmentEdgeManager.currentTime()).toString(); 141 this.logDir = 142 new Path(this.hbaseWALRootDir, AbstractFSWALProvider.getWALDirectoryName(serverName)); 143 144 if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) { 145 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true); 146 } 147 if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseWALRootDir)) { 148 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseWALRootDir, true); 149 } 150 this.wals = new WALFactory(conf, serverName); 151 } 152 153 @AfterEach 154 public void tearDown() throws Exception { 155 try { 156 wals.shutdown(); 157 } catch (IOException exception) { 158 // one of our tests splits out from under our wals. 159 LOG.warn("Ignoring failure to close wal factory. " + exception.getMessage()); 160 LOG.debug("details of failure to close wal factory.", exception); 161 } 162 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true); 163 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseWALRootDir, true); 164 } 165 166 /** 167 * Test WAL write behavior with WALObserver. The coprocessor monitors a WALEdit written to WAL, 168 * and ignore, modify, and add KeyValue's for the WALEdit. 169 */ 170 @Test 171 public void testWALObserverWriteToWAL() throws Exception { 172 final WAL log = wals.getWAL(null); 173 verifyWritesSeen(log, getCoprocessor(log, SampleRegionWALCoprocessor.class), false); 174 } 175 176 private void verifyWritesSeen(final WAL log, final SampleRegionWALCoprocessor cp, 177 final boolean seesLegacy) throws Exception { 178 RegionInfo hri = createBasicHRegionInfo(Bytes.toString(TEST_TABLE)); 179 TableDescriptor htd = createBasic3FamilyHTD(Bytes.toString(TEST_TABLE)); 180 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 181 for (byte[] fam : htd.getColumnFamilyNames()) { 182 scopes.put(fam, 0); 183 } 184 Path basedir = new Path(this.hbaseRootDir, Bytes.toString(TEST_TABLE)); 185 deleteDir(basedir); 186 fs.mkdirs(new Path(basedir, hri.getEncodedName())); 187 188 // TEST_FAMILY[0] shall be removed from WALEdit. 189 // TEST_FAMILY[1] value shall be changed. 190 // TEST_FAMILY[2] shall be added to WALEdit, although it's not in the put. 191 cp.setTestValues(TEST_TABLE, TEST_ROW, TEST_FAMILY[0], TEST_QUALIFIER[0], TEST_FAMILY[1], 192 TEST_QUALIFIER[1], TEST_FAMILY[2], TEST_QUALIFIER[2]); 193 194 assertFalse(cp.isPreWALWriteCalled()); 195 assertFalse(cp.isPostWALWriteCalled()); 196 197 // TEST_FAMILY[2] is not in the put, however it shall be added by the tested 198 // coprocessor. 199 // Use a Put to create familyMap. 200 Put p = creatPutWith2Families(TEST_ROW); 201 202 Map<byte[], List<Cell>> familyMap = p.getFamilyCellMap(); 203 WALEdit edit = new WALEdit(); 204 edit.add(familyMap); 205 206 boolean foundFamily0 = false; 207 boolean foundFamily2 = false; 208 boolean modifiedFamily1 = false; 209 210 List<Cell> cells = edit.getCells(); 211 212 for (Cell cell : cells) { 213 if (Arrays.equals(CellUtil.cloneFamily(cell), TEST_FAMILY[0])) { 214 foundFamily0 = true; 215 } 216 if (Arrays.equals(CellUtil.cloneFamily(cell), TEST_FAMILY[2])) { 217 foundFamily2 = true; 218 } 219 if (Arrays.equals(CellUtil.cloneFamily(cell), TEST_FAMILY[1])) { 220 if (!Arrays.equals(CellUtil.cloneValue(cell), TEST_VALUE[1])) { 221 modifiedFamily1 = true; 222 } 223 } 224 } 225 assertTrue(foundFamily0); 226 assertFalse(foundFamily2); 227 assertFalse(modifiedFamily1); 228 229 // it's where WAL write cp should occur. 230 long now = EnvironmentEdgeManager.currentTime(); 231 // we use HLogKey here instead of WALKeyImpl directly to support legacy coprocessors. 232 long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, 233 new MultiVersionConcurrencyControl(), scopes), edit); 234 log.sync(txid); 235 236 // the edit shall have been change now by the coprocessor. 237 foundFamily0 = false; 238 foundFamily2 = false; 239 modifiedFamily1 = false; 240 for (Cell cell : cells) { 241 if (Arrays.equals(CellUtil.cloneFamily(cell), TEST_FAMILY[0])) { 242 foundFamily0 = true; 243 } 244 if (Arrays.equals(CellUtil.cloneFamily(cell), TEST_FAMILY[2])) { 245 foundFamily2 = true; 246 } 247 if (Arrays.equals(CellUtil.cloneFamily(cell), TEST_FAMILY[1])) { 248 if (!Arrays.equals(CellUtil.cloneValue(cell), TEST_VALUE[1])) { 249 modifiedFamily1 = true; 250 } 251 } 252 } 253 assertFalse(foundFamily0); 254 assertTrue(foundFamily2); 255 assertTrue(modifiedFamily1); 256 257 assertTrue(cp.isPreWALWriteCalled()); 258 assertTrue(cp.isPostWALWriteCalled()); 259 } 260 261 /** 262 * Coprocessors shouldn't get notice of empty waledits. 263 */ 264 @Test 265 public void testEmptyWALEditAreNotSeen() throws Exception { 266 RegionInfo hri = createBasicHRegionInfo(Bytes.toString(TEST_TABLE)); 267 TableDescriptor htd = createBasic3FamilyHTD(Bytes.toString(TEST_TABLE)); 268 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 269 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 270 for (byte[] fam : htd.getColumnFamilyNames()) { 271 scopes.put(fam, 0); 272 } 273 WAL log = wals.getWAL(null); 274 try { 275 SampleRegionWALCoprocessor cp = getCoprocessor(log, SampleRegionWALCoprocessor.class); 276 277 cp.setTestValues(TEST_TABLE, null, null, null, null, null, null, null); 278 279 assertFalse(cp.isPreWALWriteCalled()); 280 assertFalse(cp.isPostWALWriteCalled()); 281 282 final long now = EnvironmentEdgeManager.currentTime(); 283 long txid = log.appendData(hri, 284 new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes), 285 new WALEdit()); 286 log.sync(txid); 287 288 assertFalse(cp.isPreWALWriteCalled(), "Empty WALEdit should skip coprocessor evaluation."); 289 assertFalse(cp.isPostWALWriteCalled(), "Empty WALEdit should skip coprocessor evaluation."); 290 } finally { 291 log.close(); 292 } 293 } 294 295 /** 296 * Test WAL replay behavior with WALObserver. 297 */ 298 @Test 299 public void testWALCoprocessorReplay() throws Exception { 300 // WAL replay is handled at HRegion::replayRecoveredEdits(), which is 301 // ultimately called by HRegion::initialize() 302 TableName tableName = TableName.valueOf(currentTestName); 303 TableDescriptor htd = getBasic3FamilyHTableDescriptor(tableName); 304 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 305 // final HRegionInfo hri = 306 // createBasic3FamilyHRegionInfo(Bytes.toString(tableName)); 307 // final HRegionInfo hri1 = 308 // createBasic3FamilyHRegionInfo(Bytes.toString(tableName)); 309 RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).build(); 310 311 final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName); 312 deleteDir(basedir); 313 fs.mkdirs(new Path(basedir, hri.getEncodedName())); 314 315 final Configuration newConf = HBaseConfiguration.create(this.conf); 316 317 // WAL wal = new WAL(this.fs, this.dir, this.oldLogDir, this.conf); 318 WAL wal = wals.getWAL(null); 319 // Put p = creatPutWith2Families(TEST_ROW); 320 WALEdit edit = new WALEdit(); 321 long now = EnvironmentEdgeManager.currentTime(); 322 final int countPerFamily = 1000; 323 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 324 for (byte[] fam : htd.getColumnFamilyNames()) { 325 scopes.put(fam, 0); 326 } 327 for (byte[] fam : htd.getColumnFamilyNames()) { 328 addWALEdits(tableName, hri, TEST_ROW, fam, countPerFamily, 329 EnvironmentEdgeManager.getDelegate(), wal, scopes, mvcc); 330 } 331 wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), 332 edit); 333 // sync to fs. 334 wal.sync(); 335 336 User user = HBaseTestingUtil.getDifferentUser(newConf, ".replay.wal.secondtime"); 337 user.runAs(new PrivilegedExceptionAction<Void>() { 338 @Override 339 public Void run() throws Exception { 340 Path p = runWALSplit(newConf); 341 LOG.info("WALSplit path == " + p); 342 // Make a new wal for new region open. 343 final WALFactory wals2 = new WALFactory(conf, ServerName 344 .valueOf(currentTestName + "2", 16010, EnvironmentEdgeManager.currentTime()).toString()); 345 WAL wal2 = wals2.getWAL(null); 346 HRegion region = HRegion.openHRegion(newConf, FileSystem.get(newConf), hbaseRootDir, hri, 347 htd, wal2, TEST_UTIL.getHBaseCluster().getRegionServer(0), null); 348 349 SampleRegionWALCoprocessor cp2 = 350 region.getCoprocessorHost().findCoprocessor(SampleRegionWALCoprocessor.class); 351 assertNotNull(cp2); 352 assertTrue(cp2.isPreWALRestoreCalled()); 353 assertTrue(cp2.isPostWALRestoreCalled()); 354 assertTrue(cp2.isPreReplayWALsCalled()); 355 assertTrue(cp2.isPostReplayWALsCalled()); 356 region.close(); 357 wals2.close(); 358 return null; 359 } 360 }); 361 } 362 363 /** 364 * Test to see CP loaded successfully or not. There is a duplication at TestHLog, but the purpose 365 * of that one is to see whether the loaded CP will impact existing WAL tests or not. 366 */ 367 @Test 368 public void testWALObserverLoaded() throws Exception { 369 WAL log = wals.getWAL(null); 370 assertNotNull(getCoprocessor(log, SampleRegionWALCoprocessor.class)); 371 } 372 373 @Test 374 public void testWALObserverRoll() throws Exception { 375 final WAL wal = wals.getWAL(null); 376 final SampleRegionWALCoprocessor cp = getCoprocessor(wal, SampleRegionWALCoprocessor.class); 377 cp.setTestValues(TEST_TABLE, null, null, null, null, null, null, null); 378 379 assertFalse(cp.isPreWALRollCalled()); 380 assertFalse(cp.isPostWALRollCalled()); 381 382 wal.rollWriter(true); 383 assertTrue(cp.isPreWALRollCalled()); 384 assertTrue(cp.isPostWALRollCalled()); 385 } 386 387 private SampleRegionWALCoprocessor getCoprocessor(WAL wal, 388 Class<? extends SampleRegionWALCoprocessor> clazz) throws Exception { 389 WALCoprocessorHost host = wal.getCoprocessorHost(); 390 Coprocessor c = host.findCoprocessor(clazz.getName()); 391 return (SampleRegionWALCoprocessor) c; 392 } 393 394 /** 395 * Creates an HRI around an HTD that has <code>tableName</code>. 396 * @param tableName Name of table to use. 397 */ 398 private RegionInfo createBasicHRegionInfo(String tableName) { 399 return RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build(); 400 } 401 402 /* 403 * @param p Directory to cleanup 404 */ 405 private void deleteDir(final Path p) throws IOException { 406 if (this.fs.exists(p)) { 407 if (!this.fs.delete(p, true)) { 408 throw new IOException("Failed remove of " + p); 409 } 410 } 411 } 412 413 private Put creatPutWith2Families(byte[] row) throws IOException { 414 Put p = new Put(row); 415 for (int i = 0; i < TEST_FAMILY.length - 1; i++) { 416 p.addColumn(TEST_FAMILY[i], TEST_QUALIFIER[i], TEST_VALUE[i]); 417 } 418 return p; 419 } 420 421 private Path runWALSplit(final Configuration c) throws IOException { 422 List<Path> splits = 423 WALSplitter.split(hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals); 424 // Split should generate only 1 file since there's only 1 region 425 assertEquals(1, splits.size()); 426 // Make sure the file exists 427 assertTrue(fs.exists(splits.get(0))); 428 LOG.info("Split file=" + splits.get(0)); 429 return splits.get(0); 430 } 431 432 private void addWALEdits(final TableName tableName, final RegionInfo hri, final byte[] rowName, 433 final byte[] family, final int count, EnvironmentEdge ee, final WAL wal, 434 final NavigableMap<byte[], Integer> scopes, final MultiVersionConcurrencyControl mvcc) 435 throws IOException { 436 String familyStr = Bytes.toString(family); 437 long txid = -1; 438 for (int j = 0; j < count; j++) { 439 byte[] qualifierBytes = Bytes.toBytes(Integer.toString(j)); 440 byte[] columnBytes = Bytes.toBytes(familyStr + ":" + Integer.toString(j)); 441 WALEdit edit = new WALEdit(); 442 WALEditInternalHelper.addExtendedCell(edit, 443 new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes)); 444 // uses WALKeyImpl instead of HLogKey on purpose. will only work for tests where we don't care 445 // about legacy coprocessors 446 txid = wal.appendData(hri, 447 new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, ee.currentTime(), mvcc), edit); 448 } 449 if (-1 != txid) { 450 wal.sync(txid); 451 } 452 } 453 454 private TableDescriptor getBasic3FamilyHTableDescriptor(TableName tableName) { 455 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 456 Arrays.stream(TEST_FAMILY).map(ColumnFamilyDescriptorBuilder::of) 457 .forEachOrdered(builder::setColumnFamily); 458 return builder.build(); 459 } 460 461 private TableDescriptor createBasic3FamilyHTD(String tableName) { 462 return TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)) 463 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("a")) 464 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("b")) 465 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("c")).build(); 466 } 467}