001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertTrue; 024 025import java.io.IOException; 026import java.security.PrivilegedExceptionAction; 027import java.util.Arrays; 028import java.util.List; 029import java.util.Map; 030import java.util.NavigableMap; 031import java.util.TreeMap; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.CellUtil; 037import org.apache.hadoop.hbase.Coprocessor; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseConfiguration; 040import org.apache.hadoop.hbase.HBaseTestingUtility; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.KeyValue; 043import org.apache.hadoop.hbase.ServerName; 044import org.apache.hadoop.hbase.TableName; 045import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 046import org.apache.hadoop.hbase.client.Put; 047import org.apache.hadoop.hbase.client.RegionInfo; 048import org.apache.hadoop.hbase.client.RegionInfoBuilder; 049import org.apache.hadoop.hbase.client.TableDescriptor; 050import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 051import org.apache.hadoop.hbase.regionserver.HRegion; 052import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 053import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost; 054import org.apache.hadoop.hbase.security.User; 055import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 056import org.apache.hadoop.hbase.testclassification.MediumTests; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.apache.hadoop.hbase.util.CommonFSUtils; 059import org.apache.hadoop.hbase.util.EnvironmentEdge; 060import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 061import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 062import org.apache.hadoop.hbase.wal.WAL; 063import org.apache.hadoop.hbase.wal.WALEdit; 064import org.apache.hadoop.hbase.wal.WALFactory; 065import org.apache.hadoop.hbase.wal.WALKeyImpl; 066import org.apache.hadoop.hbase.wal.WALSplitter; 067import org.junit.After; 068import org.junit.AfterClass; 069import org.junit.Before; 070import org.junit.BeforeClass; 071import org.junit.ClassRule; 072import org.junit.Rule; 073import org.junit.Test; 074import org.junit.experimental.categories.Category; 075import org.junit.rules.TestName; 076import org.slf4j.Logger; 077import org.slf4j.LoggerFactory; 078 079/** 080 * Tests invocation of the {@link org.apache.hadoop.hbase.coprocessor.MasterObserver} interface 081 * hooks at all appropriate times during normal HMaster operations. 082 */ 083@Category({ CoprocessorTests.class, MediumTests.class }) 084public class TestWALObserver { 085 086 @ClassRule 087 public static final HBaseClassTestRule CLASS_RULE = 088 HBaseClassTestRule.forClass(TestWALObserver.class); 089 090 private static final Logger LOG = LoggerFactory.getLogger(TestWALObserver.class); 091 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 092 093 private static byte[] TEST_TABLE = Bytes.toBytes("observedTable"); 094 private static byte[][] TEST_FAMILY = 095 { Bytes.toBytes("fam1"), Bytes.toBytes("fam2"), Bytes.toBytes("fam3"), }; 096 private static byte[][] TEST_QUALIFIER = 097 { Bytes.toBytes("q1"), Bytes.toBytes("q2"), Bytes.toBytes("q3"), }; 098 private static byte[][] TEST_VALUE = 099 { Bytes.toBytes("v1"), Bytes.toBytes("v2"), Bytes.toBytes("v3"), }; 100 private static byte[] TEST_ROW = Bytes.toBytes("testRow"); 101 102 @Rule 103 public TestName currentTest = new TestName(); 104 105 private Configuration conf; 106 private FileSystem fs; 107 private Path hbaseRootDir; 108 private Path hbaseWALRootDir; 109 private Path oldLogDir; 110 private Path logDir; 111 private WALFactory wals; 112 113 @BeforeClass 114 public static void setupBeforeClass() throws Exception { 115 Configuration conf = TEST_UTIL.getConfiguration(); 116 conf.setStrings(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, 117 SampleRegionWALCoprocessor.class.getName()); 118 conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 119 SampleRegionWALCoprocessor.class.getName()); 120 conf.setInt("dfs.client.block.recovery.retries", 2); 121 122 TEST_UTIL.startMiniCluster(1); 123 Path hbaseRootDir = TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase")); 124 Path hbaseWALRootDir = 125 TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbaseLogRoot")); 126 LOG.info("hbase.rootdir=" + hbaseRootDir); 127 CommonFSUtils.setRootDir(conf, hbaseRootDir); 128 CommonFSUtils.setWALRootDir(conf, hbaseWALRootDir); 129 } 130 131 @AfterClass 132 public static void teardownAfterClass() throws Exception { 133 TEST_UTIL.shutdownMiniCluster(); 134 } 135 136 @Before 137 public void setUp() throws Exception { 138 this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 139 // this.cluster = TEST_UTIL.getDFSCluster(); 140 this.fs = TEST_UTIL.getDFSCluster().getFileSystem(); 141 this.hbaseRootDir = CommonFSUtils.getRootDir(conf); 142 this.hbaseWALRootDir = CommonFSUtils.getWALRootDir(conf); 143 this.oldLogDir = new Path(this.hbaseWALRootDir, HConstants.HREGION_OLDLOGDIR_NAME); 144 String serverName = ServerName 145 .valueOf(currentTest.getMethodName(), 16010, EnvironmentEdgeManager.currentTime()).toString(); 146 this.logDir = 147 new Path(this.hbaseWALRootDir, AbstractFSWALProvider.getWALDirectoryName(serverName)); 148 149 if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) { 150 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true); 151 } 152 if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseWALRootDir)) { 153 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseWALRootDir, true); 154 } 155 this.wals = new WALFactory(conf, serverName); 156 } 157 158 @After 159 public void tearDown() throws Exception { 160 try { 161 wals.shutdown(); 162 } catch (IOException exception) { 163 // one of our tests splits out from under our wals. 164 LOG.warn("Ignoring failure to close wal factory. " + exception.getMessage()); 165 LOG.debug("details of failure to close wal factory.", exception); 166 } 167 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true); 168 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseWALRootDir, true); 169 } 170 171 /** 172 * Test WAL write behavior with WALObserver. The coprocessor monitors a WALEdit written to WAL, 173 * and ignore, modify, and add KeyValue's for the WALEdit. 174 */ 175 @Test 176 public void testWALObserverWriteToWAL() throws Exception { 177 final WAL log = wals.getWAL(null); 178 verifyWritesSeen(log, getCoprocessor(log, SampleRegionWALCoprocessor.class), false); 179 } 180 181 private void verifyWritesSeen(final WAL log, final SampleRegionWALCoprocessor cp, 182 final boolean seesLegacy) throws Exception { 183 RegionInfo hri = createBasicHRegionInfo(Bytes.toString(TEST_TABLE)); 184 TableDescriptor htd = createBasic3FamilyHTD(Bytes.toString(TEST_TABLE)); 185 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 186 for (byte[] fam : htd.getColumnFamilyNames()) { 187 scopes.put(fam, 0); 188 } 189 Path basedir = new Path(this.hbaseRootDir, Bytes.toString(TEST_TABLE)); 190 deleteDir(basedir); 191 fs.mkdirs(new Path(basedir, hri.getEncodedName())); 192 193 // TEST_FAMILY[0] shall be removed from WALEdit. 194 // TEST_FAMILY[1] value shall be changed. 195 // TEST_FAMILY[2] shall be added to WALEdit, although it's not in the put. 196 cp.setTestValues(TEST_TABLE, TEST_ROW, TEST_FAMILY[0], TEST_QUALIFIER[0], TEST_FAMILY[1], 197 TEST_QUALIFIER[1], TEST_FAMILY[2], TEST_QUALIFIER[2]); 198 199 assertFalse(cp.isPreWALWriteCalled()); 200 assertFalse(cp.isPostWALWriteCalled()); 201 202 // TEST_FAMILY[2] is not in the put, however it shall be added by the tested 203 // coprocessor. 204 // Use a Put to create familyMap. 205 Put p = creatPutWith2Families(TEST_ROW); 206 207 Map<byte[], List<Cell>> familyMap = p.getFamilyCellMap(); 208 WALEdit edit = new WALEdit(); 209 edit.add(familyMap); 210 211 boolean foundFamily0 = false; 212 boolean foundFamily2 = false; 213 boolean modifiedFamily1 = false; 214 215 List<Cell> cells = edit.getCells(); 216 217 for (Cell cell : cells) { 218 if (Arrays.equals(CellUtil.cloneFamily(cell), TEST_FAMILY[0])) { 219 foundFamily0 = true; 220 } 221 if (Arrays.equals(CellUtil.cloneFamily(cell), TEST_FAMILY[2])) { 222 foundFamily2 = true; 223 } 224 if (Arrays.equals(CellUtil.cloneFamily(cell), TEST_FAMILY[1])) { 225 if (!Arrays.equals(CellUtil.cloneValue(cell), TEST_VALUE[1])) { 226 modifiedFamily1 = true; 227 } 228 } 229 } 230 assertTrue(foundFamily0); 231 assertFalse(foundFamily2); 232 assertFalse(modifiedFamily1); 233 234 // it's where WAL write cp should occur. 235 long now = EnvironmentEdgeManager.currentTime(); 236 // we use HLogKey here instead of WALKeyImpl directly to support legacy coprocessors. 237 long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, 238 new MultiVersionConcurrencyControl(), scopes), edit); 239 log.sync(txid); 240 241 // the edit shall have been change now by the coprocessor. 242 foundFamily0 = false; 243 foundFamily2 = false; 244 modifiedFamily1 = false; 245 for (Cell cell : cells) { 246 if (Arrays.equals(CellUtil.cloneFamily(cell), TEST_FAMILY[0])) { 247 foundFamily0 = true; 248 } 249 if (Arrays.equals(CellUtil.cloneFamily(cell), TEST_FAMILY[2])) { 250 foundFamily2 = true; 251 } 252 if (Arrays.equals(CellUtil.cloneFamily(cell), TEST_FAMILY[1])) { 253 if (!Arrays.equals(CellUtil.cloneValue(cell), TEST_VALUE[1])) { 254 modifiedFamily1 = true; 255 } 256 } 257 } 258 assertFalse(foundFamily0); 259 assertTrue(foundFamily2); 260 assertTrue(modifiedFamily1); 261 262 assertTrue(cp.isPreWALWriteCalled()); 263 assertTrue(cp.isPostWALWriteCalled()); 264 } 265 266 /** 267 * Coprocessors shouldn't get notice of empty waledits. 268 */ 269 @Test 270 public void testEmptyWALEditAreNotSeen() throws Exception { 271 RegionInfo hri = createBasicHRegionInfo(Bytes.toString(TEST_TABLE)); 272 TableDescriptor htd = createBasic3FamilyHTD(Bytes.toString(TEST_TABLE)); 273 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 274 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 275 for (byte[] fam : htd.getColumnFamilyNames()) { 276 scopes.put(fam, 0); 277 } 278 WAL log = wals.getWAL(null); 279 try { 280 SampleRegionWALCoprocessor cp = getCoprocessor(log, SampleRegionWALCoprocessor.class); 281 282 cp.setTestValues(TEST_TABLE, null, null, null, null, null, null, null); 283 284 assertFalse(cp.isPreWALWriteCalled()); 285 assertFalse(cp.isPostWALWriteCalled()); 286 287 final long now = EnvironmentEdgeManager.currentTime(); 288 long txid = log.appendData(hri, 289 new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes), 290 new WALEdit()); 291 log.sync(txid); 292 293 assertFalse("Empty WALEdit should skip coprocessor evaluation.", cp.isPreWALWriteCalled()); 294 assertFalse("Empty WALEdit should skip coprocessor evaluation.", cp.isPostWALWriteCalled()); 295 } finally { 296 log.close(); 297 } 298 } 299 300 /** 301 * Test WAL replay behavior with WALObserver. 302 */ 303 @Test 304 public void testWALCoprocessorReplay() throws Exception { 305 // WAL replay is handled at HRegion::replayRecoveredEdits(), which is 306 // ultimately called by HRegion::initialize() 307 TableName tableName = TableName.valueOf(currentTest.getMethodName()); 308 TableDescriptor htd = getBasic3FamilyHTableDescriptor(tableName); 309 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 310 // final HRegionInfo hri = 311 // createBasic3FamilyHRegionInfo(Bytes.toString(tableName)); 312 // final HRegionInfo hri1 = 313 // createBasic3FamilyHRegionInfo(Bytes.toString(tableName)); 314 RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).build(); 315 316 final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName); 317 deleteDir(basedir); 318 fs.mkdirs(new Path(basedir, hri.getEncodedName())); 319 320 final Configuration newConf = HBaseConfiguration.create(this.conf); 321 322 // WAL wal = new WAL(this.fs, this.dir, this.oldLogDir, this.conf); 323 WAL wal = wals.getWAL(null); 324 // Put p = creatPutWith2Families(TEST_ROW); 325 WALEdit edit = new WALEdit(); 326 long now = EnvironmentEdgeManager.currentTime(); 327 final int countPerFamily = 1000; 328 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 329 for (byte[] fam : htd.getColumnFamilyNames()) { 330 scopes.put(fam, 0); 331 } 332 for (byte[] fam : htd.getColumnFamilyNames()) { 333 addWALEdits(tableName, hri, TEST_ROW, fam, countPerFamily, 334 EnvironmentEdgeManager.getDelegate(), wal, scopes, mvcc); 335 } 336 wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), 337 edit); 338 // sync to fs. 339 wal.sync(); 340 341 User user = HBaseTestingUtility.getDifferentUser(newConf, ".replay.wal.secondtime"); 342 user.runAs(new PrivilegedExceptionAction<Void>() { 343 @Override 344 public Void run() throws Exception { 345 Path p = runWALSplit(newConf); 346 LOG.info("WALSplit path == " + p); 347 // Make a new wal for new region open. 348 final WALFactory wals2 = new WALFactory(conf, 349 ServerName 350 .valueOf(currentTest.getMethodName() + "2", 16010, EnvironmentEdgeManager.currentTime()) 351 .toString()); 352 WAL wal2 = wals2.getWAL(null); 353 HRegion region = HRegion.openHRegion(newConf, FileSystem.get(newConf), hbaseRootDir, hri, 354 htd, wal2, TEST_UTIL.getHBaseCluster().getRegionServer(0), null); 355 356 SampleRegionWALCoprocessor cp2 = 357 region.getCoprocessorHost().findCoprocessor(SampleRegionWALCoprocessor.class); 358 // TODO: asserting here is problematic. 359 assertNotNull(cp2); 360 assertTrue(cp2.isPreWALRestoreCalled()); 361 assertTrue(cp2.isPostWALRestoreCalled()); 362 region.close(); 363 wals2.close(); 364 return null; 365 } 366 }); 367 } 368 369 /** 370 * Test to see CP loaded successfully or not. There is a duplication at TestHLog, but the purpose 371 * of that one is to see whether the loaded CP will impact existing WAL tests or not. 372 */ 373 @Test 374 public void testWALObserverLoaded() throws Exception { 375 WAL log = wals.getWAL(null); 376 assertNotNull(getCoprocessor(log, SampleRegionWALCoprocessor.class)); 377 } 378 379 @Test 380 public void testWALObserverRoll() throws Exception { 381 final WAL wal = wals.getWAL(null); 382 final SampleRegionWALCoprocessor cp = getCoprocessor(wal, SampleRegionWALCoprocessor.class); 383 cp.setTestValues(TEST_TABLE, null, null, null, null, null, null, null); 384 385 assertFalse(cp.isPreWALRollCalled()); 386 assertFalse(cp.isPostWALRollCalled()); 387 388 wal.rollWriter(true); 389 assertTrue(cp.isPreWALRollCalled()); 390 assertTrue(cp.isPostWALRollCalled()); 391 } 392 393 private SampleRegionWALCoprocessor getCoprocessor(WAL wal, 394 Class<? extends SampleRegionWALCoprocessor> clazz) throws Exception { 395 WALCoprocessorHost host = wal.getCoprocessorHost(); 396 Coprocessor c = host.findCoprocessor(clazz.getName()); 397 return (SampleRegionWALCoprocessor) c; 398 } 399 400 /** 401 * Creates an HRI around an HTD that has <code>tableName</code>. 402 * @param tableName Name of table to use. 403 */ 404 private RegionInfo createBasicHRegionInfo(String tableName) { 405 return RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build(); 406 } 407 408 /* 409 * @param p Directory to cleanup 410 */ 411 private void deleteDir(final Path p) throws IOException { 412 if (this.fs.exists(p)) { 413 if (!this.fs.delete(p, true)) { 414 throw new IOException("Failed remove of " + p); 415 } 416 } 417 } 418 419 private Put creatPutWith2Families(byte[] row) throws IOException { 420 Put p = new Put(row); 421 for (int i = 0; i < TEST_FAMILY.length - 1; i++) { 422 p.addColumn(TEST_FAMILY[i], TEST_QUALIFIER[i], TEST_VALUE[i]); 423 } 424 return p; 425 } 426 427 private Path runWALSplit(final Configuration c) throws IOException { 428 List<Path> splits = 429 WALSplitter.split(hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals); 430 // Split should generate only 1 file since there's only 1 region 431 assertEquals(1, splits.size()); 432 // Make sure the file exists 433 assertTrue(fs.exists(splits.get(0))); 434 LOG.info("Split file=" + splits.get(0)); 435 return splits.get(0); 436 } 437 438 private void addWALEdits(final TableName tableName, final RegionInfo hri, final byte[] rowName, 439 final byte[] family, final int count, EnvironmentEdge ee, final WAL wal, 440 final NavigableMap<byte[], Integer> scopes, final MultiVersionConcurrencyControl mvcc) 441 throws IOException { 442 String familyStr = Bytes.toString(family); 443 long txid = -1; 444 for (int j = 0; j < count; j++) { 445 byte[] qualifierBytes = Bytes.toBytes(Integer.toString(j)); 446 byte[] columnBytes = Bytes.toBytes(familyStr + ":" + Integer.toString(j)); 447 WALEdit edit = new WALEdit(); 448 edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes)); 449 // uses WALKeyImpl instead of HLogKey on purpose. will only work for tests where we don't care 450 // about legacy coprocessors 451 txid = wal.appendData(hri, 452 new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, ee.currentTime(), mvcc), edit); 453 } 454 if (-1 != txid) { 455 wal.sync(txid); 456 } 457 } 458 459 private TableDescriptor getBasic3FamilyHTableDescriptor(TableName tableName) { 460 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 461 Arrays.stream(TEST_FAMILY).map(ColumnFamilyDescriptorBuilder::of) 462 .forEachOrdered(builder::setColumnFamily); 463 return builder.build(); 464 } 465 466 private TableDescriptor createBasic3FamilyHTD(String tableName) { 467 return TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)) 468 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("a")) 469 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("b")) 470 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("c")).build(); 471 } 472}