001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.hamcrest.CoreMatchers.containsString;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNull;
025import static org.junit.Assert.assertThat;
026import static org.junit.Assert.assertTrue;
027import static org.junit.Assert.fail;
028
029import java.io.IOException;
030import java.util.ArrayList;
031import java.util.Comparator;
032import java.util.List;
033import java.util.NavigableMap;
034import java.util.Set;
035import java.util.TreeMap;
036import java.util.UUID;
037import java.util.concurrent.atomic.AtomicBoolean;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.FileStatus;
040import org.apache.hadoop.fs.FileSystem;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.hbase.CellScanner;
043import org.apache.hadoop.hbase.Coprocessor;
044import org.apache.hadoop.hbase.HBaseConfiguration;
045import org.apache.hadoop.hbase.HBaseTestingUtility;
046import org.apache.hadoop.hbase.HConstants;
047import org.apache.hadoop.hbase.KeyValue;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
050import org.apache.hadoop.hbase.client.Get;
051import org.apache.hadoop.hbase.client.Put;
052import org.apache.hadoop.hbase.client.RegionInfo;
053import org.apache.hadoop.hbase.client.RegionInfoBuilder;
054import org.apache.hadoop.hbase.client.Result;
055import org.apache.hadoop.hbase.client.TableDescriptor;
056import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
057import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
058import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor;
059import org.apache.hadoop.hbase.regionserver.HRegion;
060import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
061import org.apache.hadoop.hbase.regionserver.SequenceId;
062import org.apache.hadoop.hbase.util.Bytes;
063import org.apache.hadoop.hbase.util.CommonFSUtils;
064import org.apache.hadoop.hbase.util.EnvironmentEdge;
065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
066import org.apache.hadoop.hbase.util.Threads;
067import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
068import org.apache.hadoop.hbase.wal.WAL;
069import org.apache.hadoop.hbase.wal.WALEdit;
070import org.apache.hadoop.hbase.wal.WALKey;
071import org.apache.hadoop.hbase.wal.WALKeyImpl;
072import org.junit.AfterClass;
073import org.junit.Before;
074import org.junit.BeforeClass;
075import org.junit.Rule;
076import org.junit.Test;
077import org.junit.rules.TestName;
078import org.slf4j.Logger;
079import org.slf4j.LoggerFactory;
080
081public abstract class AbstractTestFSWAL {
082
083  protected static final Logger LOG = LoggerFactory.getLogger(AbstractTestFSWAL.class);
084
085  protected static Configuration CONF;
086  protected static FileSystem FS;
087  protected static Path DIR;
088  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
089
090  @Rule
091  public final TestName currentTest = new TestName();
092
093  @Before
094  public void setUp() throws Exception {
095    FileStatus[] entries = FS.listStatus(new Path("/"));
096    for (FileStatus dir : entries) {
097      FS.delete(dir.getPath(), true);
098    }
099    final Path hbaseDir = TEST_UTIL.createRootDir();
100    final Path hbaseWALDir = TEST_UTIL.createWALRootDir();
101    DIR = new Path(hbaseWALDir, currentTest.getMethodName());
102    assertNotEquals(hbaseDir, hbaseWALDir);
103  }
104
105  @BeforeClass
106  public static void setUpBeforeClass() throws Exception {
107    // Make block sizes small.
108    TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
109    // quicker heartbeat interval for faster DN death notification
110    TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
111    TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
112    TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000);
113
114    // faster failover with cluster.shutdown();fs.close() idiom
115    TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connect.max.retries", 1);
116    TEST_UTIL.getConfiguration().setInt("dfs.client.block.recovery.retries", 1);
117    TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connection.maxidletime", 500);
118    TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
119      SampleRegionWALCoprocessor.class.getName());
120    TEST_UTIL.startMiniDFSCluster(3);
121
122    CONF = TEST_UTIL.getConfiguration();
123    FS = TEST_UTIL.getDFSCluster().getFileSystem();
124  }
125
126  @AfterClass
127  public static void tearDownAfterClass() throws Exception {
128    TEST_UTIL.shutdownMiniCluster();
129  }
130
131  protected abstract AbstractFSWAL<?> newWAL(FileSystem fs, Path rootDir, String WALDir,
132      String archiveDir, Configuration conf, List<WALActionsListener> listeners,
133      boolean failIfWALExists, String prefix, String suffix) throws IOException;
134
135  protected abstract AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String WALDir,
136      String archiveDir, Configuration conf, List<WALActionsListener> listeners,
137      boolean failIfWALExists, String prefix, String suffix, Runnable action) throws IOException;
138
139  /**
140   * A loaded WAL coprocessor won't break existing WAL test cases.
141   */
142  @Test
143  public void testWALCoprocessorLoaded() throws Exception {
144    // test to see whether the coprocessor is loaded or not.
145    AbstractFSWAL<?> wal = null;
146    try {
147      wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(),
148          HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null);
149      WALCoprocessorHost host = wal.getCoprocessorHost();
150      Coprocessor c = host.findCoprocessor(SampleRegionWALCoprocessor.class);
151      assertNotNull(c);
152    } finally {
153      if (wal != null) {
154        wal.close();
155      }
156    }
157  }
158
159  protected void addEdits(WAL log, RegionInfo hri, TableDescriptor htd, int times,
160      MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes)
161      throws IOException {
162    final byte[] row = Bytes.toBytes("row");
163    for (int i = 0; i < times; i++) {
164      long timestamp = System.currentTimeMillis();
165      WALEdit cols = new WALEdit();
166      cols.add(new KeyValue(row, row, row, timestamp, row));
167      WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), htd.getTableName(),
168          SequenceId.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE,
169          HConstants.NO_NONCE, mvcc, scopes);
170      log.append(hri, key, cols, true);
171    }
172    log.sync();
173  }
174
175  /**
176   * helper method to simulate region flush for a WAL.
177   * @param wal
178   * @param regionEncodedName
179   */
180  protected void flushRegion(WAL wal, byte[] regionEncodedName, Set<byte[]> flushedFamilyNames) {
181    wal.startCacheFlush(regionEncodedName, flushedFamilyNames);
182    wal.completeCacheFlush(regionEncodedName);
183  }
184
185  /**
186   * tests the log comparator. Ensure that we are not mixing meta logs with non-meta logs (throws
187   * exception if we do). Comparison is based on the timestamp present in the wal name.
188   * @throws Exception
189   */
190  @Test
191  public void testWALComparator() throws Exception {
192    AbstractFSWAL<?> wal1 = null;
193    AbstractFSWAL<?> walMeta = null;
194    try {
195      wal1 = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(),
196          HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null);
197      LOG.debug("Log obtained is: " + wal1);
198      Comparator<Path> comp = wal1.LOG_NAME_COMPARATOR;
199      Path p1 = wal1.computeFilename(11);
200      Path p2 = wal1.computeFilename(12);
201      // comparing with itself returns 0
202      assertTrue(comp.compare(p1, p1) == 0);
203      // comparing with different filenum.
204      assertTrue(comp.compare(p1, p2) < 0);
205      walMeta = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(),
206          HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null,
207          AbstractFSWALProvider.META_WAL_PROVIDER_ID);
208      Comparator<Path> compMeta = walMeta.LOG_NAME_COMPARATOR;
209
210      Path p1WithMeta = walMeta.computeFilename(11);
211      Path p2WithMeta = walMeta.computeFilename(12);
212      assertTrue(compMeta.compare(p1WithMeta, p1WithMeta) == 0);
213      assertTrue(compMeta.compare(p1WithMeta, p2WithMeta) < 0);
214      // mixing meta and non-meta logs gives error
215      boolean ex = false;
216      try {
217        comp.compare(p1WithMeta, p2);
218      } catch (IllegalArgumentException e) {
219        ex = true;
220      }
221      assertTrue("Comparator doesn't complain while checking meta log files", ex);
222      boolean exMeta = false;
223      try {
224        compMeta.compare(p1WithMeta, p2);
225      } catch (IllegalArgumentException e) {
226        exMeta = true;
227      }
228      assertTrue("Meta comparator doesn't complain while checking log files", exMeta);
229    } finally {
230      if (wal1 != null) {
231        wal1.close();
232      }
233      if (walMeta != null) {
234        walMeta.close();
235      }
236    }
237  }
238
239  /**
240   * On rolling a wal after reaching the threshold, {@link WAL#rollWriter()} returns the list of
241   * regions which should be flushed in order to archive the oldest wal file.
242   * <p>
243   * This method tests this behavior by inserting edits and rolling the wal enough times to reach
244   * the max number of logs threshold. It checks whether we get the "right regions" for flush on
245   * rolling the wal.
246   * @throws Exception
247   */
248  @Test
249  public void testFindMemStoresEligibleForFlush() throws Exception {
250    LOG.debug("testFindMemStoresEligibleForFlush");
251    Configuration conf1 = HBaseConfiguration.create(CONF);
252    conf1.setInt("hbase.regionserver.maxlogs", 1);
253    AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(conf1), DIR.toString(),
254      HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null);
255    TableDescriptor t1 = TableDescriptorBuilder.newBuilder(TableName.valueOf("t1"))
256      .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build();
257    TableDescriptor t2 = TableDescriptorBuilder.newBuilder(TableName.valueOf("t2"))
258      .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build();
259    RegionInfo hri1 = RegionInfoBuilder.newBuilder(t1.getTableName()).build();
260    RegionInfo hri2 = RegionInfoBuilder.newBuilder(t2.getTableName()).build();
261    // add edits and roll the wal
262    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
263    NavigableMap<byte[], Integer> scopes1 = new TreeMap<>(Bytes.BYTES_COMPARATOR);
264    for (byte[] fam : t1.getColumnFamilyNames()) {
265      scopes1.put(fam, 0);
266    }
267    NavigableMap<byte[], Integer> scopes2 = new TreeMap<>(Bytes.BYTES_COMPARATOR);
268    for (byte[] fam : t2.getColumnFamilyNames()) {
269      scopes2.put(fam, 0);
270    }
271    try {
272      addEdits(wal, hri1, t1, 2, mvcc, scopes1);
273      wal.rollWriter();
274      // add some more edits and roll the wal. This would reach the log number threshold
275      addEdits(wal, hri1, t1, 2, mvcc, scopes1);
276      wal.rollWriter();
277      // with above rollWriter call, the max logs limit is reached.
278      assertTrue(wal.getNumRolledLogFiles() == 2);
279
280      // get the regions to flush; since there is only one region in the oldest wal, it should
281      // return only one region.
282      byte[][] regionsToFlush = wal.findRegionsToForceFlush();
283      assertEquals(1, regionsToFlush.length);
284      assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
285      // insert edits in second region
286      addEdits(wal, hri2, t2, 2, mvcc, scopes2);
287      // get the regions to flush, it should still read region1.
288      regionsToFlush = wal.findRegionsToForceFlush();
289      assertEquals(1, regionsToFlush.length);
290      assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
291      // flush region 1, and roll the wal file. Only last wal which has entries for region1 should
292      // remain.
293      flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
294      wal.rollWriter();
295      // only one wal should remain now (that is for the second region).
296      assertEquals(1, wal.getNumRolledLogFiles());
297      // flush the second region
298      flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames());
299      wal.rollWriter(true);
300      // no wal should remain now.
301      assertEquals(0, wal.getNumRolledLogFiles());
302      // add edits both to region 1 and region 2, and roll.
303      addEdits(wal, hri1, t1, 2, mvcc, scopes1);
304      addEdits(wal, hri2, t2, 2, mvcc, scopes2);
305      wal.rollWriter();
306      // add edits and roll the writer, to reach the max logs limit.
307      assertEquals(1, wal.getNumRolledLogFiles());
308      addEdits(wal, hri1, t1, 2, mvcc, scopes1);
309      wal.rollWriter();
310      // it should return two regions to flush, as the oldest wal file has entries
311      // for both regions.
312      regionsToFlush = wal.findRegionsToForceFlush();
313      assertEquals(2, regionsToFlush.length);
314      // flush both regions
315      flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
316      flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames());
317      wal.rollWriter(true);
318      assertEquals(0, wal.getNumRolledLogFiles());
319      // Add an edit to region1, and roll the wal.
320      addEdits(wal, hri1, t1, 2, mvcc, scopes1);
321      // tests partial flush: roll on a partial flush, and ensure that wal is not archived.
322      wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
323      wal.rollWriter();
324      wal.completeCacheFlush(hri1.getEncodedNameAsBytes());
325      assertEquals(1, wal.getNumRolledLogFiles());
326    } finally {
327      if (wal != null) {
328        wal.close();
329      }
330    }
331  }
332
333  @Test(expected = IOException.class)
334  public void testFailedToCreateWALIfParentRenamed() throws IOException,
335      CommonFSUtils.StreamLacksCapabilityException {
336    final String name = "testFailedToCreateWALIfParentRenamed";
337    AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), name,
338      HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null);
339    long filenum = System.currentTimeMillis();
340    Path path = wal.computeFilename(filenum);
341    wal.createWriterInstance(path);
342    Path parent = path.getParent();
343    path = wal.computeFilename(filenum + 1);
344    Path newPath = new Path(parent.getParent(), parent.getName() + "-splitting");
345    FS.rename(parent, newPath);
346    wal.createWriterInstance(path);
347    fail("It should fail to create the new WAL");
348  }
349
350  /**
351   * Test flush for sure has a sequence id that is beyond the last edit appended. We do this by
352   * slowing appends in the background ring buffer thread while in foreground we call flush. The
353   * addition of the sync over HRegion in flush should fix an issue where flush was returning before
354   * all of its appends had made it out to the WAL (HBASE-11109).
355   * @throws IOException
356   * @see <a href="https://issues.apache.org/jira/browse/HBASE-11109">HBASE-11109</a>
357   */
358  @Test
359  public void testFlushSequenceIdIsGreaterThanAllEditsInHFile() throws IOException {
360    String testName = currentTest.getMethodName();
361    final TableName tableName = TableName.valueOf(testName);
362    final RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).build();
363    final byte[] rowName = tableName.getName();
364    final TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName)
365      .setColumnFamily(ColumnFamilyDescriptorBuilder.of("f")).build();
366    HRegion r = HBaseTestingUtility.createRegionAndWAL(hri, TEST_UTIL.getDefaultRootDirPath(),
367      TEST_UTIL.getConfiguration(), htd);
368    HBaseTestingUtility.closeRegionAndWAL(r);
369    final int countPerFamily = 10;
370    final AtomicBoolean goslow = new AtomicBoolean(false);
371    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
372    for (byte[] fam : htd.getColumnFamilyNames()) {
373      scopes.put(fam, 0);
374    }
375    // subclass and doctor a method.
376    AbstractFSWAL<?> wal = newSlowWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(),
377        testName, CONF, null, true, null, null, new Runnable() {
378
379          @Override
380          public void run() {
381            if (goslow.get()) {
382              Threads.sleep(100);
383              LOG.debug("Sleeping before appending 100ms");
384            }
385          }
386        });
387    HRegion region = HRegion.openHRegion(TEST_UTIL.getConfiguration(),
388      TEST_UTIL.getTestFileSystem(), TEST_UTIL.getDefaultRootDirPath(), hri, htd, wal);
389    EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
390    try {
391      List<Put> puts = null;
392      for (byte[] fam : htd.getColumnFamilyNames()) {
393        puts =
394            TestWALReplay.addRegionEdits(rowName, fam, countPerFamily, ee, region, "x");
395      }
396
397      // Now assert edits made it in.
398      final Get g = new Get(rowName);
399      Result result = region.get(g);
400      assertEquals(countPerFamily * htd.getColumnFamilyNames().size(), result.size());
401
402      // Construct a WALEdit and add it a few times to the WAL.
403      WALEdit edits = new WALEdit();
404      for (Put p : puts) {
405        CellScanner cs = p.cellScanner();
406        while (cs.advance()) {
407          edits.add(cs.current());
408        }
409      }
410      // Add any old cluster id.
411      List<UUID> clusterIds = new ArrayList<>(1);
412      clusterIds.add(UUID.randomUUID());
413      // Now make appends run slow.
414      goslow.set(true);
415      for (int i = 0; i < countPerFamily; i++) {
416        final RegionInfo info = region.getRegionInfo();
417        final WALKeyImpl logkey = new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
418            System.currentTimeMillis(), clusterIds, -1, -1, region.getMVCC(), scopes);
419        wal.append(info, logkey, edits, true);
420        region.getMVCC().completeAndWait(logkey.getWriteEntry());
421      }
422      region.flush(true);
423      // FlushResult.flushSequenceId is not visible here so go get the current sequence id.
424      long currentSequenceId = region.getReadPoint(null);
425      // Now release the appends
426      goslow.set(false);
427      assertTrue(currentSequenceId >= region.getReadPoint(null));
428    } finally {
429      region.close(true);
430      wal.close();
431    }
432  }
433
434  @Test
435  public void testSyncNoAppend() throws IOException {
436    String testName = currentTest.getMethodName();
437    AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), testName,
438        CONF, null, true, null, null);
439    try {
440      wal.sync();
441    } finally {
442      wal.close();
443    }
444  }
445
446  @Test
447  public void testWriteEntryCanBeNull() throws IOException {
448    String testName = currentTest.getMethodName();
449    AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), testName,
450      CONF, null, true, null, null);
451    wal.close();
452    TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf("table"))
453      .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build();
454    RegionInfo ri = RegionInfoBuilder.newBuilder(td.getTableName()).build();
455    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
456    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
457    for (byte[] fam : td.getColumnFamilyNames()) {
458      scopes.put(fam, 0);
459    }
460    long timestamp = System.currentTimeMillis();
461    byte[] row = Bytes.toBytes("row");
462    WALEdit cols = new WALEdit();
463    cols.add(new KeyValue(row, row, row, timestamp, row));
464    WALKeyImpl key =
465        new WALKeyImpl(ri.getEncodedNameAsBytes(), td.getTableName(), SequenceId.NO_SEQUENCE_ID,
466          timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE, mvcc, scopes);
467    try {
468      wal.append(ri, key, cols, true);
469      fail("Should fail since the wal has already been closed");
470    } catch (IOException e) {
471      // expected
472      assertThat(e.getMessage(), containsString("log is closed"));
473      // the WriteEntry should be null since we fail before setting it.
474      assertNull(key.getWriteEntry());
475    }
476  }
477}