001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.security.PrivilegedExceptionAction;
027import java.util.Arrays;
028import java.util.List;
029import java.util.Map;
030import java.util.NavigableMap;
031import java.util.TreeMap;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.CellUtil;
037import org.apache.hadoop.hbase.Coprocessor;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseConfiguration;
040import org.apache.hadoop.hbase.HBaseTestingUtility;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.KeyValue;
043import org.apache.hadoop.hbase.ServerName;
044import org.apache.hadoop.hbase.TableName;
045import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
046import org.apache.hadoop.hbase.client.Put;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.client.RegionInfoBuilder;
049import org.apache.hadoop.hbase.client.TableDescriptor;
050import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
051import org.apache.hadoop.hbase.regionserver.HRegion;
052import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
053import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
054import org.apache.hadoop.hbase.security.User;
055import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
056import org.apache.hadoop.hbase.testclassification.MediumTests;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.apache.hadoop.hbase.util.EnvironmentEdge;
059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
060import org.apache.hadoop.hbase.util.FSUtils;
061import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
062import org.apache.hadoop.hbase.wal.WAL;
063import org.apache.hadoop.hbase.wal.WALEdit;
064import org.apache.hadoop.hbase.wal.WALFactory;
065import org.apache.hadoop.hbase.wal.WALKeyImpl;
066import org.apache.hadoop.hbase.wal.WALSplitter;
067import org.junit.After;
068import org.junit.AfterClass;
069import org.junit.Before;
070import org.junit.BeforeClass;
071import org.junit.ClassRule;
072import org.junit.Rule;
073import org.junit.Test;
074import org.junit.experimental.categories.Category;
075import org.junit.rules.TestName;
076import org.slf4j.Logger;
077import org.slf4j.LoggerFactory;
078
079/**
080 * Tests invocation of the
081 * {@link org.apache.hadoop.hbase.coprocessor.MasterObserver} interface hooks at
082 * all appropriate times during normal HMaster operations.
083 */
084@Category({CoprocessorTests.class, MediumTests.class})
085public class TestWALObserver {
086
087  @ClassRule
088  public static final HBaseClassTestRule CLASS_RULE =
089      HBaseClassTestRule.forClass(TestWALObserver.class);
090
091  private static final Logger LOG = LoggerFactory.getLogger(TestWALObserver.class);
092  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
093
094  private static byte[] TEST_TABLE = Bytes.toBytes("observedTable");
095  private static byte[][] TEST_FAMILY = { Bytes.toBytes("fam1"),
096      Bytes.toBytes("fam2"), Bytes.toBytes("fam3"), };
097  private static byte[][] TEST_QUALIFIER = { Bytes.toBytes("q1"),
098      Bytes.toBytes("q2"), Bytes.toBytes("q3"), };
099  private static byte[][] TEST_VALUE = { Bytes.toBytes("v1"),
100      Bytes.toBytes("v2"), Bytes.toBytes("v3"), };
101  private static byte[] TEST_ROW = Bytes.toBytes("testRow");
102
103  @Rule
104  public TestName currentTest = new TestName();
105
106  private Configuration conf;
107  private FileSystem fs;
108  private Path hbaseRootDir;
109  private Path hbaseWALRootDir;
110  private Path oldLogDir;
111  private Path logDir;
112  private WALFactory wals;
113
114  @BeforeClass
115  public static void setupBeforeClass() throws Exception {
116    Configuration conf = TEST_UTIL.getConfiguration();
117    conf.setStrings(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
118        SampleRegionWALCoprocessor.class.getName());
119    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
120        SampleRegionWALCoprocessor.class.getName());
121    conf.setInt("dfs.client.block.recovery.retries", 2);
122
123    TEST_UTIL.startMiniCluster(1);
124    Path hbaseRootDir = TEST_UTIL.getDFSCluster().getFileSystem()
125        .makeQualified(new Path("/hbase"));
126    Path hbaseWALRootDir = TEST_UTIL.getDFSCluster().getFileSystem()
127            .makeQualified(new Path("/hbaseLogRoot"));
128    LOG.info("hbase.rootdir=" + hbaseRootDir);
129    FSUtils.setRootDir(conf, hbaseRootDir);
130    FSUtils.setWALRootDir(conf, hbaseWALRootDir);
131  }
132
133  @AfterClass
134  public static void teardownAfterClass() throws Exception {
135    TEST_UTIL.shutdownMiniCluster();
136  }
137
138  @Before
139  public void setUp() throws Exception {
140    this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
141    // this.cluster = TEST_UTIL.getDFSCluster();
142    this.fs = TEST_UTIL.getDFSCluster().getFileSystem();
143    this.hbaseRootDir = FSUtils.getRootDir(conf);
144    this.hbaseWALRootDir = FSUtils.getWALRootDir(conf);
145    this.oldLogDir = new Path(this.hbaseWALRootDir,
146        HConstants.HREGION_OLDLOGDIR_NAME);
147    String serverName = ServerName.valueOf(currentTest.getMethodName(), 16010,
148        System.currentTimeMillis()).toString();
149    this.logDir = new Path(this.hbaseWALRootDir,
150      AbstractFSWALProvider.getWALDirectoryName(serverName));
151
152    if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
153      TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
154    }
155    if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseWALRootDir)) {
156      TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseWALRootDir, true);
157    }
158    this.wals = new WALFactory(conf, serverName);
159  }
160
161  @After
162  public void tearDown() throws Exception {
163    try {
164      wals.shutdown();
165    } catch (IOException exception) {
166      // one of our tests splits out from under our wals.
167      LOG.warn("Ignoring failure to close wal factory. " + exception.getMessage());
168      LOG.debug("details of failure to close wal factory.", exception);
169    }
170    TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
171    TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseWALRootDir, true);
172  }
173
174  /**
175   * Test WAL write behavior with WALObserver. The coprocessor monitors a
176   * WALEdit written to WAL, and ignore, modify, and add KeyValue's for the
177   * WALEdit.
178   */
179  @Test
180  public void testWALObserverWriteToWAL() throws Exception {
181    final WAL log = wals.getWAL(null);
182    verifyWritesSeen(log, getCoprocessor(log, SampleRegionWALCoprocessor.class), false);
183  }
184
185  private void verifyWritesSeen(final WAL log, final SampleRegionWALCoprocessor cp,
186      final boolean seesLegacy) throws Exception {
187    RegionInfo hri = createBasicHRegionInfo(Bytes.toString(TEST_TABLE));
188    TableDescriptor htd = createBasic3FamilyHTD(Bytes
189        .toString(TEST_TABLE));
190    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
191    for (byte[] fam : htd.getColumnFamilyNames()) {
192      scopes.put(fam, 0);
193    }
194    Path basedir = new Path(this.hbaseRootDir, Bytes.toString(TEST_TABLE));
195    deleteDir(basedir);
196    fs.mkdirs(new Path(basedir, hri.getEncodedName()));
197
198    // TEST_FAMILY[0] shall be removed from WALEdit.
199    // TEST_FAMILY[1] value shall be changed.
200    // TEST_FAMILY[2] shall be added to WALEdit, although it's not in the put.
201    cp.setTestValues(TEST_TABLE, TEST_ROW, TEST_FAMILY[0], TEST_QUALIFIER[0],
202        TEST_FAMILY[1], TEST_QUALIFIER[1], TEST_FAMILY[2], TEST_QUALIFIER[2]);
203
204    assertFalse(cp.isPreWALWriteCalled());
205    assertFalse(cp.isPostWALWriteCalled());
206
207    // TEST_FAMILY[2] is not in the put, however it shall be added by the tested
208    // coprocessor.
209    // Use a Put to create familyMap.
210    Put p = creatPutWith2Families(TEST_ROW);
211
212    Map<byte[], List<Cell>> familyMap = p.getFamilyCellMap();
213    WALEdit edit = new WALEdit();
214    edit.add(familyMap);
215
216    boolean foundFamily0 = false;
217    boolean foundFamily2 = false;
218    boolean modifiedFamily1 = false;
219
220    List<Cell> cells = edit.getCells();
221
222    for (Cell cell : cells) {
223      if (Arrays.equals(CellUtil.cloneFamily(cell), TEST_FAMILY[0])) {
224        foundFamily0 = true;
225      }
226      if (Arrays.equals(CellUtil.cloneFamily(cell), TEST_FAMILY[2])) {
227        foundFamily2 = true;
228      }
229      if (Arrays.equals(CellUtil.cloneFamily(cell), TEST_FAMILY[1])) {
230        if (!Arrays.equals(CellUtil.cloneValue(cell), TEST_VALUE[1])) {
231          modifiedFamily1 = true;
232        }
233      }
234    }
235    assertTrue(foundFamily0);
236    assertFalse(foundFamily2);
237    assertFalse(modifiedFamily1);
238
239    // it's where WAL write cp should occur.
240    long now = EnvironmentEdgeManager.currentTime();
241    // we use HLogKey here instead of WALKeyImpl directly to support legacy coprocessors.
242    long txid = log.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now,
243        new MultiVersionConcurrencyControl(), scopes),
244      edit, true);
245    log.sync(txid);
246
247    // the edit shall have been change now by the coprocessor.
248    foundFamily0 = false;
249    foundFamily2 = false;
250    modifiedFamily1 = false;
251    for (Cell cell : cells) {
252      if (Arrays.equals(CellUtil.cloneFamily(cell), TEST_FAMILY[0])) {
253        foundFamily0 = true;
254      }
255      if (Arrays.equals(CellUtil.cloneFamily(cell), TEST_FAMILY[2])) {
256        foundFamily2 = true;
257      }
258      if (Arrays.equals(CellUtil.cloneFamily(cell), TEST_FAMILY[1])) {
259        if (!Arrays.equals(CellUtil.cloneValue(cell), TEST_VALUE[1])) {
260          modifiedFamily1 = true;
261        }
262      }
263    }
264    assertFalse(foundFamily0);
265    assertTrue(foundFamily2);
266    assertTrue(modifiedFamily1);
267
268    assertTrue(cp.isPreWALWriteCalled());
269    assertTrue(cp.isPostWALWriteCalled());
270  }
271
272  /**
273   * Coprocessors shouldn't get notice of empty waledits.
274   */
275  @Test
276  public void testEmptyWALEditAreNotSeen() throws Exception {
277    RegionInfo hri = createBasicHRegionInfo(Bytes.toString(TEST_TABLE));
278    TableDescriptor htd = createBasic3FamilyHTD(Bytes.toString(TEST_TABLE));
279    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
280    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
281    for(byte[] fam : htd.getColumnFamilyNames()) {
282      scopes.put(fam, 0);
283    }
284    WAL log = wals.getWAL(null);
285    try {
286      SampleRegionWALCoprocessor cp = getCoprocessor(log, SampleRegionWALCoprocessor.class);
287
288      cp.setTestValues(TEST_TABLE, null, null, null, null, null, null, null);
289
290      assertFalse(cp.isPreWALWriteCalled());
291      assertFalse(cp.isPostWALWriteCalled());
292
293      final long now = EnvironmentEdgeManager.currentTime();
294      long txid = log.append(hri,
295          new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes),
296          new WALEdit(), true);
297      log.sync(txid);
298
299      assertFalse("Empty WALEdit should skip coprocessor evaluation.", cp.isPreWALWriteCalled());
300      assertFalse("Empty WALEdit should skip coprocessor evaluation.", cp.isPostWALWriteCalled());
301    } finally {
302      log.close();
303    }
304  }
305
306  /**
307   * Test WAL replay behavior with WALObserver.
308   */
309  @Test
310  public void testWALCoprocessorReplay() throws Exception {
311    // WAL replay is handled at HRegion::replayRecoveredEdits(), which is
312    // ultimately called by HRegion::initialize()
313    TableName tableName = TableName.valueOf(currentTest.getMethodName());
314    TableDescriptor htd = getBasic3FamilyHTableDescriptor(tableName);
315    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
316    // final HRegionInfo hri =
317    // createBasic3FamilyHRegionInfo(Bytes.toString(tableName));
318    // final HRegionInfo hri1 =
319    // createBasic3FamilyHRegionInfo(Bytes.toString(tableName));
320    RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).build();
321
322    final Path basedir =
323        FSUtils.getTableDir(this.hbaseRootDir, tableName);
324    deleteDir(basedir);
325    fs.mkdirs(new Path(basedir, hri.getEncodedName()));
326
327    final Configuration newConf = HBaseConfiguration.create(this.conf);
328
329    // WAL wal = new WAL(this.fs, this.dir, this.oldLogDir, this.conf);
330    WAL wal = wals.getWAL(null);
331    // Put p = creatPutWith2Families(TEST_ROW);
332    WALEdit edit = new WALEdit();
333    long now = EnvironmentEdgeManager.currentTime();
334    final int countPerFamily = 1000;
335    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
336    for (byte[] fam : htd.getColumnFamilyNames()) {
337      scopes.put(fam, 0);
338    }
339    for (byte[] fam : htd.getColumnFamilyNames()) {
340      addWALEdits(tableName, hri, TEST_ROW, fam, countPerFamily,
341        EnvironmentEdgeManager.getDelegate(), wal, scopes, mvcc);
342    }
343    wal.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit,
344      true);
345    // sync to fs.
346    wal.sync();
347
348    User user = HBaseTestingUtility.getDifferentUser(newConf,
349        ".replay.wal.secondtime");
350    user.runAs(new PrivilegedExceptionAction<Void>() {
351      @Override
352      public Void run() throws Exception {
353        Path p = runWALSplit(newConf);
354        LOG.info("WALSplit path == " + p);
355        // Make a new wal for new region open.
356        final WALFactory wals2 = new WALFactory(conf,
357            ServerName.valueOf(currentTest.getMethodName() + "2", 16010, System.currentTimeMillis())
358                .toString());
359        WAL wal2 = wals2.getWAL(null);
360        HRegion region = HRegion.openHRegion(newConf, FileSystem.get(newConf), hbaseRootDir,
361            hri, htd, wal2, TEST_UTIL.getHBaseCluster().getRegionServer(0), null);
362
363        SampleRegionWALCoprocessor cp2 =
364          region.getCoprocessorHost().findCoprocessor(SampleRegionWALCoprocessor.class);
365        // TODO: asserting here is problematic.
366        assertNotNull(cp2);
367        assertTrue(cp2.isPreWALRestoreCalled());
368        assertTrue(cp2.isPostWALRestoreCalled());
369        region.close();
370        wals2.close();
371        return null;
372      }
373    });
374  }
375
376  /**
377   * Test to see CP loaded successfully or not. There is a duplication at
378   * TestHLog, but the purpose of that one is to see whether the loaded CP will
379   * impact existing WAL tests or not.
380   */
381  @Test
382  public void testWALObserverLoaded() throws Exception {
383    WAL log = wals.getWAL(null);
384    assertNotNull(getCoprocessor(log, SampleRegionWALCoprocessor.class));
385  }
386
387  @Test
388  public void testWALObserverRoll() throws Exception {
389    final WAL wal = wals.getWAL(null);
390    final SampleRegionWALCoprocessor cp = getCoprocessor(wal, SampleRegionWALCoprocessor.class);
391    cp.setTestValues(TEST_TABLE, null, null, null, null, null, null, null);
392
393    assertFalse(cp.isPreWALRollCalled());
394    assertFalse(cp.isPostWALRollCalled());
395
396    wal.rollWriter(true);
397    assertTrue(cp.isPreWALRollCalled());
398    assertTrue(cp.isPostWALRollCalled());
399  }
400
401  private SampleRegionWALCoprocessor getCoprocessor(WAL wal,
402      Class<? extends SampleRegionWALCoprocessor> clazz) throws Exception {
403    WALCoprocessorHost host = wal.getCoprocessorHost();
404    Coprocessor c = host.findCoprocessor(clazz.getName());
405    return (SampleRegionWALCoprocessor) c;
406  }
407
408  /**
409   * Creates an HRI around an HTD that has <code>tableName</code>.
410   * @param tableName Name of table to use.
411   */
412  private RegionInfo createBasicHRegionInfo(String tableName) {
413    return RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
414  }
415
416  /*
417   * @param p Directory to cleanup
418   */
419  private void deleteDir(final Path p) throws IOException {
420    if (this.fs.exists(p)) {
421      if (!this.fs.delete(p, true)) {
422        throw new IOException("Failed remove of " + p);
423      }
424    }
425  }
426
427  private Put creatPutWith2Families(byte[] row) throws IOException {
428    Put p = new Put(row);
429    for (int i = 0; i < TEST_FAMILY.length - 1; i++) {
430      p.addColumn(TEST_FAMILY[i], TEST_QUALIFIER[i], TEST_VALUE[i]);
431    }
432    return p;
433  }
434
435  private Path runWALSplit(final Configuration c) throws IOException {
436    List<Path> splits = WALSplitter.split(
437      hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals);
438    // Split should generate only 1 file since there's only 1 region
439    assertEquals(1, splits.size());
440    // Make sure the file exists
441    assertTrue(fs.exists(splits.get(0)));
442    LOG.info("Split file=" + splits.get(0));
443    return splits.get(0);
444  }
445
446  private void addWALEdits(final TableName tableName, final RegionInfo hri, final byte[] rowName,
447      final byte[] family, final int count, EnvironmentEdge ee, final WAL wal,
448      final NavigableMap<byte[], Integer> scopes, final MultiVersionConcurrencyControl mvcc)
449      throws IOException {
450    String familyStr = Bytes.toString(family);
451    long txid = -1;
452    for (int j = 0; j < count; j++) {
453      byte[] qualifierBytes = Bytes.toBytes(Integer.toString(j));
454      byte[] columnBytes = Bytes.toBytes(familyStr + ":" + Integer.toString(j));
455      WALEdit edit = new WALEdit();
456      edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes));
457      // uses WALKeyImpl instead of HLogKey on purpose. will only work for tests where we don't care
458      // about legacy coprocessors
459      txid = wal.append(hri,
460        new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, ee.currentTime(), mvcc), edit, true);
461    }
462    if (-1 != txid) {
463      wal.sync(txid);
464    }
465  }
466
467  private TableDescriptor getBasic3FamilyHTableDescriptor(TableName tableName) {
468    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
469    Arrays.stream(TEST_FAMILY).map(ColumnFamilyDescriptorBuilder::of)
470        .forEachOrdered(builder::setColumnFamily);
471    return builder.build();
472  }
473
474  private TableDescriptor createBasic3FamilyHTD(String tableName) {
475    return TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))
476        .setColumnFamily(ColumnFamilyDescriptorBuilder.of("a"))
477        .setColumnFamily(ColumnFamilyDescriptorBuilder.of("b"))
478        .setColumnFamily(ColumnFamilyDescriptorBuilder.of("c")).build();
479  }
480}