001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertTrue; 024 025import java.io.IOException; 026import java.security.PrivilegedExceptionAction; 027import java.util.Arrays; 028import java.util.List; 029import java.util.Map; 030import java.util.NavigableMap; 031import java.util.TreeMap; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.CellUtil; 037import org.apache.hadoop.hbase.Coprocessor; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseConfiguration; 040import org.apache.hadoop.hbase.HBaseTestingUtility; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.KeyValue; 043import org.apache.hadoop.hbase.ServerName; 044import org.apache.hadoop.hbase.TableName; 045import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 046import org.apache.hadoop.hbase.client.Put; 047import org.apache.hadoop.hbase.client.RegionInfo; 048import org.apache.hadoop.hbase.client.RegionInfoBuilder; 049import org.apache.hadoop.hbase.client.TableDescriptor; 050import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 051import org.apache.hadoop.hbase.regionserver.HRegion; 052import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 053import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost; 054import org.apache.hadoop.hbase.security.User; 055import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 056import org.apache.hadoop.hbase.testclassification.MediumTests; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.apache.hadoop.hbase.util.CommonFSUtils; 059import org.apache.hadoop.hbase.util.EnvironmentEdge; 060import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 061import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 062import org.apache.hadoop.hbase.wal.WAL; 063import org.apache.hadoop.hbase.wal.WALEdit; 064import org.apache.hadoop.hbase.wal.WALFactory; 065import org.apache.hadoop.hbase.wal.WALKeyImpl; 066import org.apache.hadoop.hbase.wal.WALSplitter; 067import org.junit.After; 068import org.junit.AfterClass; 069import org.junit.Before; 070import org.junit.BeforeClass; 071import org.junit.ClassRule; 072import org.junit.Rule; 073import org.junit.Test; 074import org.junit.experimental.categories.Category; 075import org.junit.rules.TestName; 076import org.slf4j.Logger; 077import org.slf4j.LoggerFactory; 078 079/** 080 * Tests invocation of the 081 * {@link org.apache.hadoop.hbase.coprocessor.MasterObserver} interface hooks at 082 * all appropriate times during normal HMaster operations. 083 */ 084@Category({CoprocessorTests.class, MediumTests.class}) 085public class TestWALObserver { 086 087 @ClassRule 088 public static final HBaseClassTestRule CLASS_RULE = 089 HBaseClassTestRule.forClass(TestWALObserver.class); 090 091 private static final Logger LOG = LoggerFactory.getLogger(TestWALObserver.class); 092 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 093 094 private static byte[] TEST_TABLE = Bytes.toBytes("observedTable"); 095 private static byte[][] TEST_FAMILY = { Bytes.toBytes("fam1"), 096 Bytes.toBytes("fam2"), Bytes.toBytes("fam3"), }; 097 private static byte[][] TEST_QUALIFIER = { Bytes.toBytes("q1"), 098 Bytes.toBytes("q2"), Bytes.toBytes("q3"), }; 099 private static byte[][] TEST_VALUE = { Bytes.toBytes("v1"), 100 Bytes.toBytes("v2"), Bytes.toBytes("v3"), }; 101 private static byte[] TEST_ROW = Bytes.toBytes("testRow"); 102 103 @Rule 104 public TestName currentTest = new TestName(); 105 106 private Configuration conf; 107 private FileSystem fs; 108 private Path hbaseRootDir; 109 private Path hbaseWALRootDir; 110 private Path oldLogDir; 111 private Path logDir; 112 private WALFactory wals; 113 114 @BeforeClass 115 public static void setupBeforeClass() throws Exception { 116 Configuration conf = TEST_UTIL.getConfiguration(); 117 conf.setStrings(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, 118 SampleRegionWALCoprocessor.class.getName()); 119 conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 120 SampleRegionWALCoprocessor.class.getName()); 121 conf.setInt("dfs.client.block.recovery.retries", 2); 122 123 TEST_UTIL.startMiniCluster(1); 124 Path hbaseRootDir = TEST_UTIL.getDFSCluster().getFileSystem() 125 .makeQualified(new Path("/hbase")); 126 Path hbaseWALRootDir = TEST_UTIL.getDFSCluster().getFileSystem() 127 .makeQualified(new Path("/hbaseLogRoot")); 128 LOG.info("hbase.rootdir=" + hbaseRootDir); 129 CommonFSUtils.setRootDir(conf, hbaseRootDir); 130 CommonFSUtils.setWALRootDir(conf, hbaseWALRootDir); 131 } 132 133 @AfterClass 134 public static void teardownAfterClass() throws Exception { 135 TEST_UTIL.shutdownMiniCluster(); 136 } 137 138 @Before 139 public void setUp() throws Exception { 140 this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 141 // this.cluster = TEST_UTIL.getDFSCluster(); 142 this.fs = TEST_UTIL.getDFSCluster().getFileSystem(); 143 this.hbaseRootDir = CommonFSUtils.getRootDir(conf); 144 this.hbaseWALRootDir = CommonFSUtils.getWALRootDir(conf); 145 this.oldLogDir = new Path(this.hbaseWALRootDir, 146 HConstants.HREGION_OLDLOGDIR_NAME); 147 String serverName = ServerName.valueOf(currentTest.getMethodName(), 16010, 148 System.currentTimeMillis()).toString(); 149 this.logDir = new Path(this.hbaseWALRootDir, 150 AbstractFSWALProvider.getWALDirectoryName(serverName)); 151 152 if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) { 153 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true); 154 } 155 if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseWALRootDir)) { 156 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseWALRootDir, true); 157 } 158 this.wals = new WALFactory(conf, serverName); 159 } 160 161 @After 162 public void tearDown() throws Exception { 163 try { 164 wals.shutdown(); 165 } catch (IOException exception) { 166 // one of our tests splits out from under our wals. 167 LOG.warn("Ignoring failure to close wal factory. " + exception.getMessage()); 168 LOG.debug("details of failure to close wal factory.", exception); 169 } 170 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true); 171 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseWALRootDir, true); 172 } 173 174 /** 175 * Test WAL write behavior with WALObserver. The coprocessor monitors a 176 * WALEdit written to WAL, and ignore, modify, and add KeyValue's for the 177 * WALEdit. 178 */ 179 @Test 180 public void testWALObserverWriteToWAL() throws Exception { 181 final WAL log = wals.getWAL(null); 182 verifyWritesSeen(log, getCoprocessor(log, SampleRegionWALCoprocessor.class), false); 183 } 184 185 private void verifyWritesSeen(final WAL log, final SampleRegionWALCoprocessor cp, 186 final boolean seesLegacy) throws Exception { 187 RegionInfo hri = createBasicHRegionInfo(Bytes.toString(TEST_TABLE)); 188 TableDescriptor htd = createBasic3FamilyHTD(Bytes 189 .toString(TEST_TABLE)); 190 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 191 for (byte[] fam : htd.getColumnFamilyNames()) { 192 scopes.put(fam, 0); 193 } 194 Path basedir = new Path(this.hbaseRootDir, Bytes.toString(TEST_TABLE)); 195 deleteDir(basedir); 196 fs.mkdirs(new Path(basedir, hri.getEncodedName())); 197 198 // TEST_FAMILY[0] shall be removed from WALEdit. 199 // TEST_FAMILY[1] value shall be changed. 200 // TEST_FAMILY[2] shall be added to WALEdit, although it's not in the put. 201 cp.setTestValues(TEST_TABLE, TEST_ROW, TEST_FAMILY[0], TEST_QUALIFIER[0], 202 TEST_FAMILY[1], TEST_QUALIFIER[1], TEST_FAMILY[2], TEST_QUALIFIER[2]); 203 204 assertFalse(cp.isPreWALWriteCalled()); 205 assertFalse(cp.isPostWALWriteCalled()); 206 207 // TEST_FAMILY[2] is not in the put, however it shall be added by the tested 208 // coprocessor. 209 // Use a Put to create familyMap. 210 Put p = creatPutWith2Families(TEST_ROW); 211 212 Map<byte[], List<Cell>> familyMap = p.getFamilyCellMap(); 213 WALEdit edit = new WALEdit(); 214 edit.add(familyMap); 215 216 boolean foundFamily0 = false; 217 boolean foundFamily2 = false; 218 boolean modifiedFamily1 = false; 219 220 List<Cell> cells = edit.getCells(); 221 222 for (Cell cell : cells) { 223 if (Arrays.equals(CellUtil.cloneFamily(cell), TEST_FAMILY[0])) { 224 foundFamily0 = true; 225 } 226 if (Arrays.equals(CellUtil.cloneFamily(cell), TEST_FAMILY[2])) { 227 foundFamily2 = true; 228 } 229 if (Arrays.equals(CellUtil.cloneFamily(cell), TEST_FAMILY[1])) { 230 if (!Arrays.equals(CellUtil.cloneValue(cell), TEST_VALUE[1])) { 231 modifiedFamily1 = true; 232 } 233 } 234 } 235 assertTrue(foundFamily0); 236 assertFalse(foundFamily2); 237 assertFalse(modifiedFamily1); 238 239 // it's where WAL write cp should occur. 240 long now = EnvironmentEdgeManager.currentTime(); 241 // we use HLogKey here instead of WALKeyImpl directly to support legacy coprocessors. 242 long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, 243 new MultiVersionConcurrencyControl(), scopes), edit); 244 log.sync(txid); 245 246 // the edit shall have been change now by the coprocessor. 247 foundFamily0 = false; 248 foundFamily2 = false; 249 modifiedFamily1 = false; 250 for (Cell cell : cells) { 251 if (Arrays.equals(CellUtil.cloneFamily(cell), TEST_FAMILY[0])) { 252 foundFamily0 = true; 253 } 254 if (Arrays.equals(CellUtil.cloneFamily(cell), TEST_FAMILY[2])) { 255 foundFamily2 = true; 256 } 257 if (Arrays.equals(CellUtil.cloneFamily(cell), TEST_FAMILY[1])) { 258 if (!Arrays.equals(CellUtil.cloneValue(cell), TEST_VALUE[1])) { 259 modifiedFamily1 = true; 260 } 261 } 262 } 263 assertFalse(foundFamily0); 264 assertTrue(foundFamily2); 265 assertTrue(modifiedFamily1); 266 267 assertTrue(cp.isPreWALWriteCalled()); 268 assertTrue(cp.isPostWALWriteCalled()); 269 } 270 271 /** 272 * Coprocessors shouldn't get notice of empty waledits. 273 */ 274 @Test 275 public void testEmptyWALEditAreNotSeen() throws Exception { 276 RegionInfo hri = createBasicHRegionInfo(Bytes.toString(TEST_TABLE)); 277 TableDescriptor htd = createBasic3FamilyHTD(Bytes.toString(TEST_TABLE)); 278 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 279 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 280 for(byte[] fam : htd.getColumnFamilyNames()) { 281 scopes.put(fam, 0); 282 } 283 WAL log = wals.getWAL(null); 284 try { 285 SampleRegionWALCoprocessor cp = getCoprocessor(log, SampleRegionWALCoprocessor.class); 286 287 cp.setTestValues(TEST_TABLE, null, null, null, null, null, null, null); 288 289 assertFalse(cp.isPreWALWriteCalled()); 290 assertFalse(cp.isPostWALWriteCalled()); 291 292 final long now = EnvironmentEdgeManager.currentTime(); 293 long txid = log.appendData(hri, 294 new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes), 295 new WALEdit()); 296 log.sync(txid); 297 298 assertFalse("Empty WALEdit should skip coprocessor evaluation.", cp.isPreWALWriteCalled()); 299 assertFalse("Empty WALEdit should skip coprocessor evaluation.", cp.isPostWALWriteCalled()); 300 } finally { 301 log.close(); 302 } 303 } 304 305 /** 306 * Test WAL replay behavior with WALObserver. 307 */ 308 @Test 309 public void testWALCoprocessorReplay() throws Exception { 310 // WAL replay is handled at HRegion::replayRecoveredEdits(), which is 311 // ultimately called by HRegion::initialize() 312 TableName tableName = TableName.valueOf(currentTest.getMethodName()); 313 TableDescriptor htd = getBasic3FamilyHTableDescriptor(tableName); 314 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 315 // final HRegionInfo hri = 316 // createBasic3FamilyHRegionInfo(Bytes.toString(tableName)); 317 // final HRegionInfo hri1 = 318 // createBasic3FamilyHRegionInfo(Bytes.toString(tableName)); 319 RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).build(); 320 321 final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName); 322 deleteDir(basedir); 323 fs.mkdirs(new Path(basedir, hri.getEncodedName())); 324 325 final Configuration newConf = HBaseConfiguration.create(this.conf); 326 327 // WAL wal = new WAL(this.fs, this.dir, this.oldLogDir, this.conf); 328 WAL wal = wals.getWAL(null); 329 // Put p = creatPutWith2Families(TEST_ROW); 330 WALEdit edit = new WALEdit(); 331 long now = EnvironmentEdgeManager.currentTime(); 332 final int countPerFamily = 1000; 333 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 334 for (byte[] fam : htd.getColumnFamilyNames()) { 335 scopes.put(fam, 0); 336 } 337 for (byte[] fam : htd.getColumnFamilyNames()) { 338 addWALEdits(tableName, hri, TEST_ROW, fam, countPerFamily, 339 EnvironmentEdgeManager.getDelegate(), wal, scopes, mvcc); 340 } 341 wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), 342 edit); 343 // sync to fs. 344 wal.sync(); 345 346 User user = HBaseTestingUtility.getDifferentUser(newConf, 347 ".replay.wal.secondtime"); 348 user.runAs(new PrivilegedExceptionAction<Void>() { 349 @Override 350 public Void run() throws Exception { 351 Path p = runWALSplit(newConf); 352 LOG.info("WALSplit path == " + p); 353 // Make a new wal for new region open. 354 final WALFactory wals2 = new WALFactory(conf, 355 ServerName.valueOf(currentTest.getMethodName() + "2", 16010, System.currentTimeMillis()) 356 .toString()); 357 WAL wal2 = wals2.getWAL(null); 358 HRegion region = HRegion.openHRegion(newConf, FileSystem.get(newConf), hbaseRootDir, 359 hri, htd, wal2, TEST_UTIL.getHBaseCluster().getRegionServer(0), null); 360 361 SampleRegionWALCoprocessor cp2 = 362 region.getCoprocessorHost().findCoprocessor(SampleRegionWALCoprocessor.class); 363 // TODO: asserting here is problematic. 364 assertNotNull(cp2); 365 assertTrue(cp2.isPreWALRestoreCalled()); 366 assertTrue(cp2.isPostWALRestoreCalled()); 367 region.close(); 368 wals2.close(); 369 return null; 370 } 371 }); 372 } 373 374 /** 375 * Test to see CP loaded successfully or not. There is a duplication at 376 * TestHLog, but the purpose of that one is to see whether the loaded CP will 377 * impact existing WAL tests or not. 378 */ 379 @Test 380 public void testWALObserverLoaded() throws Exception { 381 WAL log = wals.getWAL(null); 382 assertNotNull(getCoprocessor(log, SampleRegionWALCoprocessor.class)); 383 } 384 385 @Test 386 public void testWALObserverRoll() throws Exception { 387 final WAL wal = wals.getWAL(null); 388 final SampleRegionWALCoprocessor cp = getCoprocessor(wal, SampleRegionWALCoprocessor.class); 389 cp.setTestValues(TEST_TABLE, null, null, null, null, null, null, null); 390 391 assertFalse(cp.isPreWALRollCalled()); 392 assertFalse(cp.isPostWALRollCalled()); 393 394 wal.rollWriter(true); 395 assertTrue(cp.isPreWALRollCalled()); 396 assertTrue(cp.isPostWALRollCalled()); 397 } 398 399 private SampleRegionWALCoprocessor getCoprocessor(WAL wal, 400 Class<? extends SampleRegionWALCoprocessor> clazz) throws Exception { 401 WALCoprocessorHost host = wal.getCoprocessorHost(); 402 Coprocessor c = host.findCoprocessor(clazz.getName()); 403 return (SampleRegionWALCoprocessor) c; 404 } 405 406 /** 407 * Creates an HRI around an HTD that has <code>tableName</code>. 408 * @param tableName Name of table to use. 409 */ 410 private RegionInfo createBasicHRegionInfo(String tableName) { 411 return RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build(); 412 } 413 414 /* 415 * @param p Directory to cleanup 416 */ 417 private void deleteDir(final Path p) throws IOException { 418 if (this.fs.exists(p)) { 419 if (!this.fs.delete(p, true)) { 420 throw new IOException("Failed remove of " + p); 421 } 422 } 423 } 424 425 private Put creatPutWith2Families(byte[] row) throws IOException { 426 Put p = new Put(row); 427 for (int i = 0; i < TEST_FAMILY.length - 1; i++) { 428 p.addColumn(TEST_FAMILY[i], TEST_QUALIFIER[i], TEST_VALUE[i]); 429 } 430 return p; 431 } 432 433 private Path runWALSplit(final Configuration c) throws IOException { 434 List<Path> splits = WALSplitter.split( 435 hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals); 436 // Split should generate only 1 file since there's only 1 region 437 assertEquals(1, splits.size()); 438 // Make sure the file exists 439 assertTrue(fs.exists(splits.get(0))); 440 LOG.info("Split file=" + splits.get(0)); 441 return splits.get(0); 442 } 443 444 private void addWALEdits(final TableName tableName, final RegionInfo hri, final byte[] rowName, 445 final byte[] family, final int count, EnvironmentEdge ee, final WAL wal, 446 final NavigableMap<byte[], Integer> scopes, final MultiVersionConcurrencyControl mvcc) 447 throws IOException { 448 String familyStr = Bytes.toString(family); 449 long txid = -1; 450 for (int j = 0; j < count; j++) { 451 byte[] qualifierBytes = Bytes.toBytes(Integer.toString(j)); 452 byte[] columnBytes = Bytes.toBytes(familyStr + ":" + Integer.toString(j)); 453 WALEdit edit = new WALEdit(); 454 edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes)); 455 // uses WALKeyImpl instead of HLogKey on purpose. will only work for tests where we don't care 456 // about legacy coprocessors 457 txid = wal.appendData(hri, 458 new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, ee.currentTime(), mvcc), edit); 459 } 460 if (-1 != txid) { 461 wal.sync(txid); 462 } 463 } 464 465 private TableDescriptor getBasic3FamilyHTableDescriptor(TableName tableName) { 466 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 467 Arrays.stream(TEST_FAMILY).map(ColumnFamilyDescriptorBuilder::of) 468 .forEachOrdered(builder::setColumnFamily); 469 return builder.build(); 470 } 471 472 private TableDescriptor createBasic3FamilyHTD(String tableName) { 473 return TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)) 474 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("a")) 475 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("b")) 476 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("c")).build(); 477 } 478}