001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertNotNull;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024
025import java.io.IOException;
026import java.security.PrivilegedExceptionAction;
027import java.util.Arrays;
028import java.util.List;
029import java.util.Map;
030import java.util.NavigableMap;
031import java.util.TreeMap;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.CellUtil;
037import org.apache.hadoop.hbase.Coprocessor;
038import org.apache.hadoop.hbase.HBaseConfiguration;
039import org.apache.hadoop.hbase.HBaseTestingUtil;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.KeyValue;
042import org.apache.hadoop.hbase.ServerName;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
045import org.apache.hadoop.hbase.client.Put;
046import org.apache.hadoop.hbase.client.RegionInfo;
047import org.apache.hadoop.hbase.client.RegionInfoBuilder;
048import org.apache.hadoop.hbase.client.TableDescriptor;
049import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
050import org.apache.hadoop.hbase.regionserver.HRegion;
051import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
052import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
053import org.apache.hadoop.hbase.security.User;
054import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
055import org.apache.hadoop.hbase.testclassification.MediumTests;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.apache.hadoop.hbase.util.CommonFSUtils;
058import org.apache.hadoop.hbase.util.EnvironmentEdge;
059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
060import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
061import org.apache.hadoop.hbase.wal.WAL;
062import org.apache.hadoop.hbase.wal.WALEdit;
063import org.apache.hadoop.hbase.wal.WALEditInternalHelper;
064import org.apache.hadoop.hbase.wal.WALFactory;
065import org.apache.hadoop.hbase.wal.WALKeyImpl;
066import org.apache.hadoop.hbase.wal.WALSplitter;
067import org.junit.jupiter.api.AfterAll;
068import org.junit.jupiter.api.AfterEach;
069import org.junit.jupiter.api.BeforeAll;
070import org.junit.jupiter.api.BeforeEach;
071import org.junit.jupiter.api.Tag;
072import org.junit.jupiter.api.Test;
073import org.junit.jupiter.api.TestInfo;
074import org.slf4j.Logger;
075import org.slf4j.LoggerFactory;
076
077/**
078 * Tests invocation of the {@link org.apache.hadoop.hbase.coprocessor.MasterObserver} interface
079 * hooks at all appropriate times during normal HMaster operations.
080 */
081@Tag(CoprocessorTests.TAG)
082@Tag(MediumTests.TAG)
083public class TestWALObserver {
084
085  private static final Logger LOG = LoggerFactory.getLogger(TestWALObserver.class);
086  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
087
088  private static byte[] TEST_TABLE = Bytes.toBytes("observedTable");
089  private static byte[][] TEST_FAMILY =
090    { Bytes.toBytes("fam1"), Bytes.toBytes("fam2"), Bytes.toBytes("fam3"), };
091  private static byte[][] TEST_QUALIFIER =
092    { Bytes.toBytes("q1"), Bytes.toBytes("q2"), Bytes.toBytes("q3"), };
093  private static byte[][] TEST_VALUE =
094    { Bytes.toBytes("v1"), Bytes.toBytes("v2"), Bytes.toBytes("v3"), };
095  private static byte[] TEST_ROW = Bytes.toBytes("testRow");
096
097  private String currentTestName;
098
099  private Configuration conf;
100  private FileSystem fs;
101  private Path hbaseRootDir;
102  private Path hbaseWALRootDir;
103  private Path oldLogDir;
104  private Path logDir;
105  private WALFactory wals;
106
107  @BeforeAll
108  public static void setupBeforeClass() throws Exception {
109    Configuration conf = TEST_UTIL.getConfiguration();
110    conf.setStrings(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
111      SampleRegionWALCoprocessor.class.getName());
112    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
113      SampleRegionWALCoprocessor.class.getName());
114    conf.setInt("dfs.client.block.recovery.retries", 2);
115
116    TEST_UTIL.startMiniCluster(1);
117    Path hbaseRootDir = TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase"));
118    Path hbaseWALRootDir =
119      TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbaseLogRoot"));
120    LOG.info("hbase.rootdir=" + hbaseRootDir);
121    CommonFSUtils.setRootDir(conf, hbaseRootDir);
122    CommonFSUtils.setWALRootDir(conf, hbaseWALRootDir);
123  }
124
125  @AfterAll
126  public static void teardownAfterClass() throws Exception {
127    TEST_UTIL.shutdownMiniCluster();
128  }
129
130  @BeforeEach
131  public void setUp(TestInfo testInfo) throws Exception {
132    currentTestName = testInfo.getTestMethod().get().getName();
133    this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
134    // this.cluster = TEST_UTIL.getDFSCluster();
135    this.fs = TEST_UTIL.getDFSCluster().getFileSystem();
136    this.hbaseRootDir = CommonFSUtils.getRootDir(conf);
137    this.hbaseWALRootDir = CommonFSUtils.getWALRootDir(conf);
138    this.oldLogDir = new Path(this.hbaseWALRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
139    String serverName =
140      ServerName.valueOf(currentTestName, 16010, EnvironmentEdgeManager.currentTime()).toString();
141    this.logDir =
142      new Path(this.hbaseWALRootDir, AbstractFSWALProvider.getWALDirectoryName(serverName));
143
144    if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
145      TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
146    }
147    if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseWALRootDir)) {
148      TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseWALRootDir, true);
149    }
150    this.wals = new WALFactory(conf, serverName);
151  }
152
153  @AfterEach
154  public void tearDown() throws Exception {
155    try {
156      wals.shutdown();
157    } catch (IOException exception) {
158      // one of our tests splits out from under our wals.
159      LOG.warn("Ignoring failure to close wal factory. " + exception.getMessage());
160      LOG.debug("details of failure to close wal factory.", exception);
161    }
162    TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
163    TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseWALRootDir, true);
164  }
165
166  /**
167   * Test WAL write behavior with WALObserver. The coprocessor monitors a WALEdit written to WAL,
168   * and ignore, modify, and add KeyValue's for the WALEdit.
169   */
170  @Test
171  public void testWALObserverWriteToWAL() throws Exception {
172    final WAL log = wals.getWAL(null);
173    verifyWritesSeen(log, getCoprocessor(log, SampleRegionWALCoprocessor.class), false);
174  }
175
176  private void verifyWritesSeen(final WAL log, final SampleRegionWALCoprocessor cp,
177    final boolean seesLegacy) throws Exception {
178    RegionInfo hri = createBasicHRegionInfo(Bytes.toString(TEST_TABLE));
179    TableDescriptor htd = createBasic3FamilyHTD(Bytes.toString(TEST_TABLE));
180    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
181    for (byte[] fam : htd.getColumnFamilyNames()) {
182      scopes.put(fam, 0);
183    }
184    Path basedir = new Path(this.hbaseRootDir, Bytes.toString(TEST_TABLE));
185    deleteDir(basedir);
186    fs.mkdirs(new Path(basedir, hri.getEncodedName()));
187
188    // TEST_FAMILY[0] shall be removed from WALEdit.
189    // TEST_FAMILY[1] value shall be changed.
190    // TEST_FAMILY[2] shall be added to WALEdit, although it's not in the put.
191    cp.setTestValues(TEST_TABLE, TEST_ROW, TEST_FAMILY[0], TEST_QUALIFIER[0], TEST_FAMILY[1],
192      TEST_QUALIFIER[1], TEST_FAMILY[2], TEST_QUALIFIER[2]);
193
194    assertFalse(cp.isPreWALWriteCalled());
195    assertFalse(cp.isPostWALWriteCalled());
196
197    // TEST_FAMILY[2] is not in the put, however it shall be added by the tested
198    // coprocessor.
199    // Use a Put to create familyMap.
200    Put p = creatPutWith2Families(TEST_ROW);
201
202    Map<byte[], List<Cell>> familyMap = p.getFamilyCellMap();
203    WALEdit edit = new WALEdit();
204    edit.add(familyMap);
205
206    boolean foundFamily0 = false;
207    boolean foundFamily2 = false;
208    boolean modifiedFamily1 = false;
209
210    List<Cell> cells = edit.getCells();
211
212    for (Cell cell : cells) {
213      if (Arrays.equals(CellUtil.cloneFamily(cell), TEST_FAMILY[0])) {
214        foundFamily0 = true;
215      }
216      if (Arrays.equals(CellUtil.cloneFamily(cell), TEST_FAMILY[2])) {
217        foundFamily2 = true;
218      }
219      if (Arrays.equals(CellUtil.cloneFamily(cell), TEST_FAMILY[1])) {
220        if (!Arrays.equals(CellUtil.cloneValue(cell), TEST_VALUE[1])) {
221          modifiedFamily1 = true;
222        }
223      }
224    }
225    assertTrue(foundFamily0);
226    assertFalse(foundFamily2);
227    assertFalse(modifiedFamily1);
228
229    // it's where WAL write cp should occur.
230    long now = EnvironmentEdgeManager.currentTime();
231    // we use HLogKey here instead of WALKeyImpl directly to support legacy coprocessors.
232    long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now,
233      new MultiVersionConcurrencyControl(), scopes), edit);
234    log.sync(txid);
235
236    // the edit shall have been change now by the coprocessor.
237    foundFamily0 = false;
238    foundFamily2 = false;
239    modifiedFamily1 = false;
240    for (Cell cell : cells) {
241      if (Arrays.equals(CellUtil.cloneFamily(cell), TEST_FAMILY[0])) {
242        foundFamily0 = true;
243      }
244      if (Arrays.equals(CellUtil.cloneFamily(cell), TEST_FAMILY[2])) {
245        foundFamily2 = true;
246      }
247      if (Arrays.equals(CellUtil.cloneFamily(cell), TEST_FAMILY[1])) {
248        if (!Arrays.equals(CellUtil.cloneValue(cell), TEST_VALUE[1])) {
249          modifiedFamily1 = true;
250        }
251      }
252    }
253    assertFalse(foundFamily0);
254    assertTrue(foundFamily2);
255    assertTrue(modifiedFamily1);
256
257    assertTrue(cp.isPreWALWriteCalled());
258    assertTrue(cp.isPostWALWriteCalled());
259  }
260
261  /**
262   * Coprocessors shouldn't get notice of empty waledits.
263   */
264  @Test
265  public void testEmptyWALEditAreNotSeen() throws Exception {
266    RegionInfo hri = createBasicHRegionInfo(Bytes.toString(TEST_TABLE));
267    TableDescriptor htd = createBasic3FamilyHTD(Bytes.toString(TEST_TABLE));
268    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
269    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
270    for (byte[] fam : htd.getColumnFamilyNames()) {
271      scopes.put(fam, 0);
272    }
273    WAL log = wals.getWAL(null);
274    try {
275      SampleRegionWALCoprocessor cp = getCoprocessor(log, SampleRegionWALCoprocessor.class);
276
277      cp.setTestValues(TEST_TABLE, null, null, null, null, null, null, null);
278
279      assertFalse(cp.isPreWALWriteCalled());
280      assertFalse(cp.isPostWALWriteCalled());
281
282      final long now = EnvironmentEdgeManager.currentTime();
283      long txid = log.appendData(hri,
284        new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes),
285        new WALEdit());
286      log.sync(txid);
287
288      assertFalse(cp.isPreWALWriteCalled(), "Empty WALEdit should skip coprocessor evaluation.");
289      assertFalse(cp.isPostWALWriteCalled(), "Empty WALEdit should skip coprocessor evaluation.");
290    } finally {
291      log.close();
292    }
293  }
294
295  /**
296   * Test WAL replay behavior with WALObserver.
297   */
298  @Test
299  public void testWALCoprocessorReplay() throws Exception {
300    // WAL replay is handled at HRegion::replayRecoveredEdits(), which is
301    // ultimately called by HRegion::initialize()
302    TableName tableName = TableName.valueOf(currentTestName);
303    TableDescriptor htd = getBasic3FamilyHTableDescriptor(tableName);
304    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
305    // final HRegionInfo hri =
306    // createBasic3FamilyHRegionInfo(Bytes.toString(tableName));
307    // final HRegionInfo hri1 =
308    // createBasic3FamilyHRegionInfo(Bytes.toString(tableName));
309    RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).build();
310
311    final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName);
312    deleteDir(basedir);
313    fs.mkdirs(new Path(basedir, hri.getEncodedName()));
314
315    final Configuration newConf = HBaseConfiguration.create(this.conf);
316
317    // WAL wal = new WAL(this.fs, this.dir, this.oldLogDir, this.conf);
318    WAL wal = wals.getWAL(null);
319    // Put p = creatPutWith2Families(TEST_ROW);
320    WALEdit edit = new WALEdit();
321    long now = EnvironmentEdgeManager.currentTime();
322    final int countPerFamily = 1000;
323    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
324    for (byte[] fam : htd.getColumnFamilyNames()) {
325      scopes.put(fam, 0);
326    }
327    for (byte[] fam : htd.getColumnFamilyNames()) {
328      addWALEdits(tableName, hri, TEST_ROW, fam, countPerFamily,
329        EnvironmentEdgeManager.getDelegate(), wal, scopes, mvcc);
330    }
331    wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes),
332      edit);
333    // sync to fs.
334    wal.sync();
335
336    User user = HBaseTestingUtil.getDifferentUser(newConf, ".replay.wal.secondtime");
337    user.runAs(new PrivilegedExceptionAction<Void>() {
338      @Override
339      public Void run() throws Exception {
340        Path p = runWALSplit(newConf);
341        LOG.info("WALSplit path == " + p);
342        // Make a new wal for new region open.
343        final WALFactory wals2 = new WALFactory(conf, ServerName
344          .valueOf(currentTestName + "2", 16010, EnvironmentEdgeManager.currentTime()).toString());
345        WAL wal2 = wals2.getWAL(null);
346        HRegion region = HRegion.openHRegion(newConf, FileSystem.get(newConf), hbaseRootDir, hri,
347          htd, wal2, TEST_UTIL.getHBaseCluster().getRegionServer(0), null);
348
349        SampleRegionWALCoprocessor cp2 =
350          region.getCoprocessorHost().findCoprocessor(SampleRegionWALCoprocessor.class);
351        assertNotNull(cp2);
352        assertTrue(cp2.isPreWALRestoreCalled());
353        assertTrue(cp2.isPostWALRestoreCalled());
354        assertTrue(cp2.isPreReplayWALsCalled());
355        assertTrue(cp2.isPostReplayWALsCalled());
356        region.close();
357        wals2.close();
358        return null;
359      }
360    });
361  }
362
363  /**
364   * Test to see CP loaded successfully or not. There is a duplication at TestHLog, but the purpose
365   * of that one is to see whether the loaded CP will impact existing WAL tests or not.
366   */
367  @Test
368  public void testWALObserverLoaded() throws Exception {
369    WAL log = wals.getWAL(null);
370    assertNotNull(getCoprocessor(log, SampleRegionWALCoprocessor.class));
371  }
372
373  @Test
374  public void testWALObserverRoll() throws Exception {
375    final WAL wal = wals.getWAL(null);
376    final SampleRegionWALCoprocessor cp = getCoprocessor(wal, SampleRegionWALCoprocessor.class);
377    cp.setTestValues(TEST_TABLE, null, null, null, null, null, null, null);
378
379    assertFalse(cp.isPreWALRollCalled());
380    assertFalse(cp.isPostWALRollCalled());
381
382    wal.rollWriter(true);
383    assertTrue(cp.isPreWALRollCalled());
384    assertTrue(cp.isPostWALRollCalled());
385  }
386
387  private SampleRegionWALCoprocessor getCoprocessor(WAL wal,
388    Class<? extends SampleRegionWALCoprocessor> clazz) throws Exception {
389    WALCoprocessorHost host = wal.getCoprocessorHost();
390    Coprocessor c = host.findCoprocessor(clazz.getName());
391    return (SampleRegionWALCoprocessor) c;
392  }
393
394  /**
395   * Creates an HRI around an HTD that has <code>tableName</code>.
396   * @param tableName Name of table to use.
397   */
398  private RegionInfo createBasicHRegionInfo(String tableName) {
399    return RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
400  }
401
402  /*
403   * @param p Directory to cleanup
404   */
405  private void deleteDir(final Path p) throws IOException {
406    if (this.fs.exists(p)) {
407      if (!this.fs.delete(p, true)) {
408        throw new IOException("Failed remove of " + p);
409      }
410    }
411  }
412
413  private Put creatPutWith2Families(byte[] row) throws IOException {
414    Put p = new Put(row);
415    for (int i = 0; i < TEST_FAMILY.length - 1; i++) {
416      p.addColumn(TEST_FAMILY[i], TEST_QUALIFIER[i], TEST_VALUE[i]);
417    }
418    return p;
419  }
420
421  private Path runWALSplit(final Configuration c) throws IOException {
422    List<Path> splits =
423      WALSplitter.split(hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals);
424    // Split should generate only 1 file since there's only 1 region
425    assertEquals(1, splits.size());
426    // Make sure the file exists
427    assertTrue(fs.exists(splits.get(0)));
428    LOG.info("Split file=" + splits.get(0));
429    return splits.get(0);
430  }
431
432  private void addWALEdits(final TableName tableName, final RegionInfo hri, final byte[] rowName,
433    final byte[] family, final int count, EnvironmentEdge ee, final WAL wal,
434    final NavigableMap<byte[], Integer> scopes, final MultiVersionConcurrencyControl mvcc)
435    throws IOException {
436    String familyStr = Bytes.toString(family);
437    long txid = -1;
438    for (int j = 0; j < count; j++) {
439      byte[] qualifierBytes = Bytes.toBytes(Integer.toString(j));
440      byte[] columnBytes = Bytes.toBytes(familyStr + ":" + Integer.toString(j));
441      WALEdit edit = new WALEdit();
442      WALEditInternalHelper.addExtendedCell(edit,
443        new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes));
444      // uses WALKeyImpl instead of HLogKey on purpose. will only work for tests where we don't care
445      // about legacy coprocessors
446      txid = wal.appendData(hri,
447        new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, ee.currentTime(), mvcc), edit);
448    }
449    if (-1 != txid) {
450      wal.sync(txid);
451    }
452  }
453
454  private TableDescriptor getBasic3FamilyHTableDescriptor(TableName tableName) {
455    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
456    Arrays.stream(TEST_FAMILY).map(ColumnFamilyDescriptorBuilder::of)
457      .forEachOrdered(builder::setColumnFamily);
458    return builder.build();
459  }
460
461  private TableDescriptor createBasic3FamilyHTD(String tableName) {
462    return TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))
463      .setColumnFamily(ColumnFamilyDescriptorBuilder.of("a"))
464      .setColumnFamily(ColumnFamilyDescriptorBuilder.of("b"))
465      .setColumnFamily(ColumnFamilyDescriptorBuilder.of("c")).build();
466  }
467}