001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import static org.apache.hadoop.hbase.wal.WALFactory.META_WAL_PROVIDER;
021import static org.apache.hadoop.hbase.wal.WALFactory.WAL_PROVIDER;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertFalse;
024import static org.junit.Assert.assertNotNull;
025import static org.junit.Assert.assertNotSame;
026import static org.junit.Assert.assertThrows;
027import static org.junit.Assert.assertTrue;
028import static org.junit.Assert.fail;
029
030import java.io.IOException;
031import java.io.InputStream;
032import java.lang.reflect.Method;
033import java.net.BindException;
034import java.util.ArrayList;
035import java.util.List;
036import java.util.NavigableMap;
037import java.util.TreeMap;
038import java.util.concurrent.atomic.AtomicBoolean;
039import java.util.stream.Collectors;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FSDataInputStream;
042import org.apache.hadoop.fs.FSDataOutputStream;
043import org.apache.hadoop.fs.FileStatus;
044import org.apache.hadoop.fs.FileSystem;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.hbase.Cell;
047import org.apache.hadoop.hbase.CellUtil;
048import org.apache.hadoop.hbase.Coprocessor;
049import org.apache.hadoop.hbase.HBaseClassTestRule;
050import org.apache.hadoop.hbase.HBaseTestingUtil;
051import org.apache.hadoop.hbase.HConstants;
052import org.apache.hadoop.hbase.KeyValue;
053import org.apache.hadoop.hbase.ServerName;
054import org.apache.hadoop.hbase.TableName;
055import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
056import org.apache.hadoop.hbase.client.RegionInfo;
057import org.apache.hadoop.hbase.client.RegionInfoBuilder;
058import org.apache.hadoop.hbase.client.TableDescriptor;
059import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
060import org.apache.hadoop.hbase.codec.Codec;
061import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
062import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor;
063import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
064import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
065import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
066import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
067import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
068import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
069import org.apache.hadoop.hbase.testclassification.MediumTests;
070import org.apache.hadoop.hbase.testclassification.RegionServerTests;
071import org.apache.hadoop.hbase.util.Bytes;
072import org.apache.hadoop.hbase.util.CommonFSUtils;
073import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
074import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
075import org.apache.hadoop.hbase.util.Threads;
076import org.apache.hadoop.hbase.wal.WALFactory.Providers;
077import org.apache.hadoop.hdfs.DistributedFileSystem;
078import org.apache.hadoop.hdfs.MiniDFSCluster;
079import org.apache.hadoop.hdfs.protocol.HdfsConstants;
080import org.junit.After;
081import org.junit.AfterClass;
082import org.junit.Before;
083import org.junit.BeforeClass;
084import org.junit.ClassRule;
085import org.junit.Rule;
086import org.junit.Test;
087import org.junit.experimental.categories.Category;
088import org.junit.rules.TestName;
089import org.slf4j.Logger;
090import org.slf4j.LoggerFactory;
091
092/**
093 * WAL tests that can be reused across providers.
094 */
095@Category({ RegionServerTests.class, MediumTests.class })
096public class TestWALFactory {
097
098  @ClassRule
099  public static final HBaseClassTestRule CLASS_RULE =
100    HBaseClassTestRule.forClass(TestWALFactory.class);
101
102  private static final Logger LOG = LoggerFactory.getLogger(TestWALFactory.class);
103
104  protected static Configuration conf;
105  private static MiniDFSCluster cluster;
106  protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
107  protected static Path hbaseDir;
108  protected static Path hbaseWALDir;
109
110  protected FileSystem fs;
111  protected Path dir;
112  protected WALFactory wals;
113  private ServerName currentServername;
114
115  @Rule
116  public final TestName currentTest = new TestName();
117
118  @Before
119  public void setUp() throws Exception {
120    fs = cluster.getFileSystem();
121    dir = new Path(hbaseDir, currentTest.getMethodName());
122    this.currentServername = ServerName.valueOf(currentTest.getMethodName(), 16010, 1);
123    wals = new WALFactory(conf, this.currentServername.toString());
124  }
125
126  @After
127  public void tearDown() throws Exception {
128    // testAppendClose closes the FileSystem, which will prevent us from closing cleanly here.
129    try {
130      wals.close();
131    } catch (IOException exception) {
132      LOG.warn("Encountered exception while closing wal factory. If you have other errors, this"
133        + " may be the cause. Message: " + exception);
134      LOG.debug("Exception details for failure to close wal factory.", exception);
135    }
136    FileStatus[] entries = fs.listStatus(new Path("/"));
137    for (FileStatus dir : entries) {
138      fs.delete(dir.getPath(), true);
139    }
140  }
141
142  @BeforeClass
143  public static void setUpBeforeClass() throws Exception {
144    CommonFSUtils.setWALRootDir(TEST_UTIL.getConfiguration(), new Path("file:///tmp/wal"));
145    // Make block sizes small.
146    TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
147    // needed for testAppendClose()
148    // quicker heartbeat interval for faster DN death notification
149    TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
150    TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
151    TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000);
152
153    // faster failover with cluster.shutdown();fs.close() idiom
154    TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connect.max.retries", 1);
155    TEST_UTIL.getConfiguration().setInt("dfs.client.block.recovery.retries", 1);
156    TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connection.maxidletime", 500);
157    TEST_UTIL.getConfiguration().setInt("hbase.lease.recovery.timeout", 10000);
158    TEST_UTIL.getConfiguration().setInt("hbase.lease.recovery.dfs.timeout", 1000);
159    TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
160      SampleRegionWALCoprocessor.class.getName());
161    TEST_UTIL.startMiniDFSCluster(3);
162
163    conf = TEST_UTIL.getConfiguration();
164    cluster = TEST_UTIL.getDFSCluster();
165
166    hbaseDir = TEST_UTIL.createRootDir();
167    hbaseWALDir = TEST_UTIL.createWALRootDir();
168  }
169
170  @AfterClass
171  public static void tearDownAfterClass() throws Exception {
172    TEST_UTIL.shutdownMiniCluster();
173  }
174
175  @Test
176  public void canCloseSingleton() throws IOException {
177    WALFactory.getInstance(conf).close();
178  }
179
180  /**
181   * Just write multiple logs then split. Before fix for HADOOP-2283, this would fail.
182   */
183  @Test
184  public void testSplit() throws IOException {
185    final TableName tableName = TableName.valueOf(currentTest.getMethodName());
186    final byte[] rowName = tableName.getName();
187    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
188    final int howmany = 3;
189    RegionInfo[] infos = new RegionInfo[3];
190    Path tableDataDir = CommonFSUtils.getTableDir(hbaseDir, tableName);
191    fs.mkdirs(tableDataDir);
192    Path tabledir = CommonFSUtils.getWALTableDir(conf, tableName);
193    fs.mkdirs(tabledir);
194    for (int i = 0; i < howmany; i++) {
195      infos[i] = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("" + i))
196        .setEndKey(Bytes.toBytes("" + (i + 1))).build();
197      fs.mkdirs(new Path(tabledir, infos[i].getEncodedName()));
198      fs.mkdirs(new Path(tableDataDir, infos[i].getEncodedName()));
199      LOG.info("allo " + new Path(tabledir, infos[i].getEncodedName()).toString());
200    }
201    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
202    scopes.put(Bytes.toBytes("column"), 0);
203
204    // Add edits for three regions.
205    for (int ii = 0; ii < howmany; ii++) {
206      for (int i = 0; i < howmany; i++) {
207        final WAL log = wals.getWAL(infos[i]);
208        for (int j = 0; j < howmany; j++) {
209          WALEdit edit = new WALEdit();
210          byte[] family = Bytes.toBytes("column");
211          byte[] qualifier = Bytes.toBytes(Integer.toString(j));
212          byte[] column = Bytes.toBytes("column:" + Integer.toString(j));
213          edit.add(
214            new KeyValue(rowName, family, qualifier, EnvironmentEdgeManager.currentTime(), column));
215          LOG.info("Region " + i + ": " + edit);
216          WALKeyImpl walKey = new WALKeyImpl(infos[i].getEncodedNameAsBytes(), tableName,
217            EnvironmentEdgeManager.currentTime(), mvcc, scopes);
218          log.appendData(infos[i], walKey, edit);
219          walKey.getWriteEntry();
220        }
221        log.sync();
222        log.rollWriter(true);
223      }
224    }
225    wals.shutdown();
226    // The below calculation of logDir relies on insider information... WALSplitter should be
227    // connected better
228    // with the WAL system.... not requiring explicit path. The oldLogDir is just made up not used.
229    Path logDir = new Path(new Path(hbaseWALDir, HConstants.HREGION_LOGDIR_NAME),
230      this.currentServername.toString());
231    Path oldLogDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
232    List<Path> splits = WALSplitter.split(hbaseWALDir, logDir, oldLogDir, fs, conf, wals);
233    verifySplits(splits, howmany);
234  }
235
236  /**
237   * Test new HDFS-265 sync.
238   */
239  @Test
240  public void Broken_testSync() throws Exception {
241    TableName tableName = TableName.valueOf(currentTest.getMethodName());
242    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
243    // First verify that using streams all works.
244    Path p = new Path(dir, currentTest.getMethodName() + ".fsdos");
245    FSDataOutputStream out = fs.create(p);
246    out.write(tableName.getName());
247    Method syncMethod = null;
248    try {
249      syncMethod = out.getClass().getMethod("hflush", new Class<?>[] {});
250    } catch (NoSuchMethodException e) {
251      try {
252        syncMethod = out.getClass().getMethod("sync", new Class<?>[] {});
253      } catch (NoSuchMethodException ex) {
254        fail("This version of Hadoop supports neither Syncable.sync() " + "nor Syncable.hflush().");
255      }
256    }
257    syncMethod.invoke(out, new Object[] {});
258    FSDataInputStream in = fs.open(p);
259    assertTrue(in.available() > 0);
260    byte[] buffer = new byte[1024];
261    int read = in.read(buffer);
262    assertEquals(tableName.getName().length, read);
263    out.close();
264    in.close();
265
266    final int total = 20;
267    RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
268    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
269    scopes.put(tableName.getName(), 0);
270    final WAL wal = wals.getWAL(info);
271
272    for (int i = 0; i < total; i++) {
273      WALEdit kvs = new WALEdit();
274      kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
275      wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
276        EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs);
277    }
278    // Now call sync and try reading. Opening a Reader before you sync just
279    // gives you EOFE.
280    wal.sync();
281    // Open a Reader.
282    Path walPath = AbstractFSWALProvider.getCurrentFileName(wal);
283    int count = NoEOFWALStreamReader.count(wals, fs, walPath);
284    assertEquals(total, count);
285    // Add test that checks to see that an open of a Reader works on a file
286    // that has had a sync done on it.
287    for (int i = 0; i < total; i++) {
288      WALEdit kvs = new WALEdit();
289      kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
290      wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
291        EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs);
292    }
293    wal.sync();
294    count = NoEOFWALStreamReader.count(wals, fs, walPath);
295    assertTrue(count >= total);
296    // If I sync, should see double the edits.
297    wal.sync();
298    count = NoEOFWALStreamReader.count(wals, fs, walPath);
299    assertEquals(total * 2, count);
300    // Now do a test that ensures stuff works when we go over block boundary,
301    // especially that we return good length on file.
302    final byte[] value = new byte[1025 * 1024]; // Make a 1M value.
303    for (int i = 0; i < total; i++) {
304      WALEdit kvs = new WALEdit();
305      kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), value));
306      wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
307        EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs);
308    }
309    // Now I should have written out lots of blocks. Sync then read.
310    wal.sync();
311    count = NoEOFWALStreamReader.count(wals, fs, walPath);
312    assertEquals(total * 3, count);
313    // shutdown and ensure that Reader gets right length also.
314    wal.shutdown();
315    count = NoEOFWALStreamReader.count(wals, fs, walPath);
316    assertEquals(total * 3, count);
317  }
318
319  private void verifySplits(final List<Path> splits, final int howmany) throws IOException {
320    assertEquals(howmany * howmany, splits.size());
321    for (int i = 0; i < splits.size(); i++) {
322      LOG.info("Verifying=" + splits.get(i));
323      try (WALStreamReader reader = wals.createStreamReader(fs, splits.get(i))) {
324        int count = 0;
325        String previousRegion = null;
326        long seqno = -1;
327        WAL.Entry entry = new WAL.Entry();
328        while ((entry = reader.next(entry)) != null) {
329          WALKey key = entry.getKey();
330          String region = Bytes.toString(key.getEncodedRegionName());
331          // Assert that all edits are for same region.
332          if (previousRegion != null) {
333            assertEquals(previousRegion, region);
334          }
335          LOG.info("oldseqno=" + seqno + ", newseqno=" + key.getSequenceId());
336          assertTrue(seqno < key.getSequenceId());
337          seqno = key.getSequenceId();
338          previousRegion = region;
339          count++;
340        }
341        assertEquals(howmany, count);
342      }
343    }
344  }
345
346  /*
347   * We pass different values to recoverFileLease() so that different code paths are covered For
348   * this test to pass, requires: 1. HDFS-200 (append support) 2. HDFS-988 (SafeMode should freeze
349   * file operations [FSNamesystem.nextGenerationStampForBlock]) 3. HDFS-142 (on restart, maintain
350   * pendingCreates)
351   */
352  @Test
353  public void testAppendClose() throws Exception {
354    TableName tableName = TableName.valueOf(currentTest.getMethodName());
355    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).build();
356
357    WAL wal = wals.getWAL(regionInfo);
358    int total = 20;
359
360    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
361    scopes.put(tableName.getName(), 0);
362    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
363    for (int i = 0; i < total; i++) {
364      WALEdit kvs = new WALEdit();
365      kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
366      wal.appendData(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName,
367        EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs);
368    }
369    // Now call sync to send the data to HDFS datanodes
370    wal.sync();
371    int namenodePort = cluster.getNameNodePort();
372    final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal);
373
374    // Stop the cluster. (ensure restart since we're sharing MiniDFSCluster)
375    try {
376      DistributedFileSystem dfs = cluster.getFileSystem();
377      dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
378      TEST_UTIL.shutdownMiniDFSCluster();
379      try {
380        // wal.writer.close() will throw an exception,
381        // but still call this since it closes the LogSyncer thread first
382        wal.shutdown();
383      } catch (IOException e) {
384        LOG.info(e.toString(), e);
385      }
386      fs.close(); // closing FS last so DFSOutputStream can't call close
387      LOG.info("STOPPED first instance of the cluster");
388    } finally {
389      // Restart the cluster
390      while (cluster.isClusterUp()) {
391        LOG.error("Waiting for cluster to go down");
392        Thread.sleep(1000);
393      }
394      assertFalse(cluster.isClusterUp());
395      cluster = null;
396      for (int i = 0; i < 100; i++) {
397        try {
398          cluster = TEST_UTIL.startMiniDFSClusterForTestWAL(namenodePort);
399          break;
400        } catch (BindException e) {
401          LOG.info("Sleeping.  BindException bringing up new cluster");
402          Threads.sleep(1000);
403        }
404      }
405      cluster.waitActive();
406      fs = cluster.getFileSystem();
407      LOG.info("STARTED second instance.");
408    }
409
410    // set the lease period to be 1 second so that the
411    // namenode triggers lease recovery upon append request
412    Method setLeasePeriod =
413      cluster.getClass().getDeclaredMethod("setLeasePeriod", new Class[] { Long.TYPE, Long.TYPE });
414    setLeasePeriod.setAccessible(true);
415    setLeasePeriod.invoke(cluster, 1000L, 1000L);
416    try {
417      Thread.sleep(1000);
418    } catch (InterruptedException e) {
419      LOG.info(e.toString(), e);
420    }
421
422    // Now try recovering the log, like the HMaster would do
423    final FileSystem recoveredFs = fs;
424    final Configuration rlConf = conf;
425
426    class RecoverLogThread extends Thread {
427      public Exception exception = null;
428
429      @Override
430      public void run() {
431        try {
432          RecoverLeaseFSUtils.recoverFileLease(recoveredFs, walPath, rlConf, null);
433        } catch (IOException e) {
434          exception = e;
435        }
436      }
437    }
438
439    RecoverLogThread t = new RecoverLogThread();
440    t.start();
441    // Timeout after 60 sec. Without correct patches, would be an infinite loop
442    t.join(60 * 1000);
443    if (t.isAlive()) {
444      t.interrupt();
445      throw new Exception("Timed out waiting for WAL.recoverLog()");
446    }
447
448    if (t.exception != null) throw t.exception;
449
450    // Make sure you can read all the content
451    int count = 0;
452    try (NoEOFWALStreamReader reader = NoEOFWALStreamReader.create(wals, fs, walPath)) {
453      WAL.Entry entry = new WAL.Entry();
454      while (reader.next(entry) != null) {
455        count++;
456        assertTrue("Should be one KeyValue per WALEdit", entry.getEdit().getCells().size() == 1);
457      }
458    }
459    assertEquals(total, count);
460
461    // Reset the lease period
462    setLeasePeriod.invoke(cluster, new Object[] { 60000L, 3600000L });
463  }
464
465  /**
466   * Tests that we can write out an edit, close, and then read it back in again.
467   */
468  @Test
469  public void testEditAdd() throws IOException {
470    int colCount = 10;
471    TableDescriptor htd =
472      TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName()))
473        .setColumnFamily(ColumnFamilyDescriptorBuilder.of("column")).build();
474    NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
475    for (byte[] fam : htd.getColumnFamilyNames()) {
476      scopes.put(fam, 0);
477    }
478    byte[] row = Bytes.toBytes("row");
479    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
480
481    // Write columns named 1, 2, 3, etc. and then values of single byte
482    // 1, 2, 3...
483    long timestamp = EnvironmentEdgeManager.currentTime();
484    WALEdit cols = new WALEdit();
485    for (int i = 0; i < colCount; i++) {
486      cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(i)),
487        timestamp, new byte[] { (byte) (i + '0') }));
488    }
489    RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(row)
490      .setEndKey(Bytes.toBytes(Bytes.toString(row) + "1")).build();
491    final WAL log = wals.getWAL(info);
492
493    final long txid = log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(),
494      htd.getTableName(), EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols);
495    log.sync(txid);
496    log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
497    log.completeCacheFlush(info.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
498    log.shutdown();
499    Path filename = AbstractFSWALProvider.getCurrentFileName(log);
500    // Now open a reader on the log and assert append worked.
501    try (NoEOFWALStreamReader reader = NoEOFWALStreamReader.create(wals, fs, filename)) {
502      // Above we added all columns on a single row so we only read one
503      // entry in the below... thats why we have '1'.
504      for (int i = 0; i < 1; i++) {
505        WAL.Entry entry = reader.next(null);
506        if (entry == null) break;
507        WALKey key = entry.getKey();
508        WALEdit val = entry.getEdit();
509        assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName()));
510        assertTrue(htd.getTableName().equals(key.getTableName()));
511        Cell cell = val.getCells().get(0);
512        assertTrue(Bytes.equals(row, 0, row.length, cell.getRowArray(), cell.getRowOffset(),
513          cell.getRowLength()));
514        assertEquals((byte) (i + '0'), CellUtil.cloneValue(cell)[0]);
515        LOG.info(key + " " + val);
516      }
517    }
518  }
519
520  @Test
521  public void testAppend() throws IOException {
522    int colCount = 10;
523    TableDescriptor htd =
524      TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName()))
525        .setColumnFamily(ColumnFamilyDescriptorBuilder.of("column")).build();
526    NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
527    for (byte[] fam : htd.getColumnFamilyNames()) {
528      scopes.put(fam, 0);
529    }
530    byte[] row = Bytes.toBytes("row");
531    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
532    // Write columns named 1, 2, 3, etc. and then values of single byte
533    // 1, 2, 3...
534    long timestamp = EnvironmentEdgeManager.currentTime();
535    WALEdit cols = new WALEdit();
536    for (int i = 0; i < colCount; i++) {
537      cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(i)),
538        timestamp, new byte[] { (byte) (i + '0') }));
539    }
540    RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
541    final WAL log = wals.getWAL(hri);
542    final long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(),
543      htd.getTableName(), EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols);
544    log.sync(txid);
545    log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
546    log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
547    log.shutdown();
548    Path filename = AbstractFSWALProvider.getCurrentFileName(log);
549    // Now open a reader on the log and assert append worked.
550    try (WALStreamReader reader = wals.createStreamReader(fs, filename)) {
551      WAL.Entry entry = reader.next();
552      assertEquals(colCount, entry.getEdit().size());
553      int idx = 0;
554      for (Cell val : entry.getEdit().getCells()) {
555        assertTrue(
556          Bytes.equals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName()));
557        assertTrue(htd.getTableName().equals(entry.getKey().getTableName()));
558        assertTrue(Bytes.equals(row, 0, row.length, val.getRowArray(), val.getRowOffset(),
559          val.getRowLength()));
560        assertEquals((byte) (idx + '0'), CellUtil.cloneValue(val)[0]);
561        System.out.println(entry.getKey() + " " + val);
562        idx++;
563      }
564    }
565  }
566
567  /**
568   * Test that we can visit entries before they are appended
569   */
570  @Test
571  public void testVisitors() throws Exception {
572    final int COL_COUNT = 10;
573    final TableName tableName = TableName.valueOf(currentTest.getMethodName());
574    final byte[] row = Bytes.toBytes("row");
575    final DumbWALActionsListener visitor = new DumbWALActionsListener();
576    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
577    long timestamp = EnvironmentEdgeManager.currentTime();
578    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
579    scopes.put(Bytes.toBytes("column"), 0);
580
581    RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).build();
582    final WAL log = wals.getWAL(hri);
583    log.registerWALActionsListener(visitor);
584    for (int i = 0; i < COL_COUNT; i++) {
585      WALEdit cols = new WALEdit();
586      cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(i)),
587        timestamp, new byte[] { (byte) (i + '0') }));
588      log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName,
589        EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols);
590    }
591    log.sync();
592    assertEquals(COL_COUNT, visitor.increments);
593    log.unregisterWALActionsListener(visitor);
594    WALEdit cols = new WALEdit();
595    cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(11)),
596      timestamp, new byte[] { (byte) (11 + '0') }));
597    log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName,
598      EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols);
599    log.sync();
600    assertEquals(COL_COUNT, visitor.increments);
601  }
602
603  /**
604   * A loaded WAL coprocessor won't break existing WAL test cases.
605   */
606  @Test
607  public void testWALCoprocessorLoaded() throws Exception {
608    // test to see whether the coprocessor is loaded or not.
609    WALCoprocessorHost host = wals.getWAL(null).getCoprocessorHost();
610    Coprocessor c = host.findCoprocessor(SampleRegionWALCoprocessor.class);
611    assertNotNull(c);
612  }
613
614  static class DumbWALActionsListener implements WALActionsListener {
615    int increments = 0;
616
617    @Override
618    public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) {
619      increments++;
620    }
621  }
622
623  @Test
624  public void testWALProviders() throws IOException {
625    Configuration conf = new Configuration();
626    WALFactory walFactory = new WALFactory(conf, this.currentServername.toString());
627    assertEquals(walFactory.getWALProvider().getClass(), walFactory.getMetaProvider().getClass());
628
629    // if providers are not set and do not enable SyncReplicationWALProvider
630    walFactory = new WALFactory(conf, this.currentServername, null);
631    assertEquals(walFactory.getWALProvider().getClass(), walFactory.getMetaProvider().getClass());
632  }
633
634  @Test
635  public void testOnlySetWALProvider() throws IOException {
636    Configuration conf = new Configuration();
637    conf.set(WAL_PROVIDER, WALFactory.Providers.multiwal.name());
638    WALFactory walFactory = new WALFactory(conf, this.currentServername.toString());
639    // class of WALProvider and metaWALProvider are the same when metaWALProvider is not set
640    assertEquals(WALFactory.Providers.multiwal.clazz, walFactory.getWALProvider().getClass());
641    assertEquals(WALFactory.Providers.multiwal.clazz, walFactory.getMetaProvider().getClass());
642  }
643
644  @Test
645  public void testOnlySetMetaWALProvider() throws IOException {
646    Configuration conf = new Configuration();
647    conf.set(META_WAL_PROVIDER, WALFactory.Providers.asyncfs.name());
648    WALFactory walFactory = new WALFactory(conf, this.currentServername.toString());
649    assertEquals(WALFactory.Providers.defaultProvider.clazz,
650      walFactory.getWALProvider().getClass());
651    assertEquals(WALFactory.Providers.asyncfs.clazz, walFactory.getMetaProvider().getClass());
652  }
653
654  @Test
655  public void testDefaultProvider() throws IOException {
656    final Configuration conf = new Configuration();
657    // AsyncFSWal is the default, we should be able to request any WAL.
658    final WALFactory normalWalFactory = new WALFactory(conf, this.currentServername.toString());
659    Class<? extends WALProvider> fshLogProvider =
660      normalWalFactory.getProviderClass(WALFactory.WAL_PROVIDER, Providers.filesystem.name());
661    assertEquals(Providers.filesystem.clazz, fshLogProvider);
662
663    // Imagine a world where MultiWAL is the default
664    final WALFactory customizedWalFactory =
665      new WALFactory(conf, this.currentServername.toString()) {
666        @Override
667        Providers getDefaultProvider() {
668          return Providers.multiwal;
669        }
670      };
671    // If we don't specify a WALProvider, we should get the default implementation.
672    Class<? extends WALProvider> multiwalProviderClass =
673      customizedWalFactory.getProviderClass(WALFactory.WAL_PROVIDER, Providers.multiwal.name());
674    assertEquals(Providers.multiwal.clazz, multiwalProviderClass);
675  }
676
677  @Test
678  public void testCustomProvider() throws IOException {
679    final Configuration config = new Configuration();
680    config.set(WALFactory.WAL_PROVIDER, IOTestProvider.class.getName());
681    final WALFactory walFactory = new WALFactory(config, this.currentServername.toString());
682    Class<? extends WALProvider> walProvider =
683      walFactory.getProviderClass(WALFactory.WAL_PROVIDER, Providers.filesystem.name());
684    assertEquals(IOTestProvider.class, walProvider);
685    WALProvider metaWALProvider = walFactory.getMetaProvider();
686    assertEquals(IOTestProvider.class, metaWALProvider.getClass());
687  }
688
689  @Test
690  public void testCustomMetaProvider() throws IOException {
691    final Configuration config = new Configuration();
692    config.set(WALFactory.META_WAL_PROVIDER, IOTestProvider.class.getName());
693    final WALFactory walFactory = new WALFactory(config, this.currentServername.toString());
694    Class<? extends WALProvider> walProvider =
695      walFactory.getProviderClass(WALFactory.WAL_PROVIDER, Providers.filesystem.name());
696    assertEquals(Providers.filesystem.clazz, walProvider);
697    WALProvider metaWALProvider = walFactory.getMetaProvider();
698    assertEquals(IOTestProvider.class, metaWALProvider.getClass());
699  }
700
701  @Test
702  public void testCustomReplicationProvider() throws IOException {
703    final Configuration config = new Configuration();
704    config.set(WALFactory.REPLICATION_WAL_PROVIDER, IOTestProvider.class.getName());
705    final WALFactory walFactory = new WALFactory(config, this.currentServername.toString());
706    Class<? extends WALProvider> walProvider =
707      walFactory.getProviderClass(WALFactory.WAL_PROVIDER, Providers.filesystem.name());
708    assertEquals(Providers.filesystem.clazz, walProvider);
709    WALProvider replicationWALProvider = walFactory.getReplicationProvider();
710    assertEquals(IOTestProvider.class, replicationWALProvider.getClass());
711  }
712
713  /**
714   * Confirm that we will use different WALs for hbase:meta and hbase:replication
715   */
716  @Test
717  public void testDifferentWALs() throws IOException {
718    WAL normalWAL = wals.getWAL(null);
719    WAL metaWAL = wals.getWAL(RegionInfoBuilder.FIRST_META_REGIONINFO);
720    WAL replicationWAL = wals.getWAL(RegionInfoBuilder
721      .newBuilder(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME_DEFAULT).build());
722    assertNotSame(normalWAL, metaWAL);
723    assertNotSame(normalWAL, replicationWAL);
724    assertNotSame(metaWAL, replicationWAL);
725  }
726
727  @Test
728  public void testReaderClosedOnBadCodec() throws IOException {
729    // Create our own Configuration and WALFactory to avoid breaking other test methods
730    Configuration confWithCodec = new Configuration(conf);
731    confWithCodec.setClass(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, BrokenWALCellCodec.class,
732      Codec.class);
733    WALFactory customFactory = new WALFactory(confWithCodec, this.currentServername.toString());
734
735    // Hack a Proxy over the FileSystem so that we can track the InputStreams opened by
736    // the FileSystem and know if close() was called on those InputStreams.
737    List<InputStreamProxy> openedReaders = new ArrayList<>();
738    FileSystemProxy proxyFs = new FileSystemProxy(fs) {
739      @Override
740      public FSDataInputStream open(Path p) throws IOException {
741        InputStreamProxy is = new InputStreamProxy(super.open(p));
742        openedReaders.add(is);
743        return is;
744      }
745
746      @Override
747      public FSDataInputStream open(Path p, int blockSize) throws IOException {
748        InputStreamProxy is = new InputStreamProxy(super.open(p, blockSize));
749        openedReaders.add(is);
750        return is;
751      }
752    };
753
754    final TableDescriptor htd =
755      TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName()))
756        .setColumnFamily(ColumnFamilyDescriptorBuilder.of("column")).build();
757    final RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
758
759    NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
760    for (byte[] fam : htd.getColumnFamilyNames()) {
761      scopes.put(fam, 0);
762    }
763    byte[] row = Bytes.toBytes("row");
764    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
765    // Write one column in one edit.
766    WALEdit cols = new WALEdit();
767    cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes("0"),
768      EnvironmentEdgeManager.currentTime(), new byte[] { 0 }));
769    final WAL log = customFactory.getWAL(hri);
770    final long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(),
771      htd.getTableName(), EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols);
772    // Sync the edit to the WAL
773    log.sync(txid);
774    log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
775    log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
776    log.shutdown();
777
778    // Inject our failure, object is constructed via reflection.
779    BrokenWALCellCodec.THROW_FAILURE_ON_INIT.set(true);
780
781    // Now open a reader on the log which will throw an exception when
782    // we try to instantiate the custom Codec.
783    Path filename = AbstractFSWALProvider.getCurrentFileName(log);
784    assertThrows("Expected to see an exception when creating WAL reader", IOException.class,
785      () -> customFactory.createStreamReader(proxyFs, filename));
786    // We should have exactly one reader
787    assertEquals(1, openedReaders.size());
788    // And that reader should be closed.
789    long unclosedReaders =
790      openedReaders.stream().filter((r) -> !r.isClosed.get()).collect(Collectors.counting());
791    assertEquals("Should not find any open readers", 0, unclosedReaders);
792  }
793
794  /**
795   * A proxy around FSDataInputStream which can report if close() was called.
796   */
797  private static class InputStreamProxy extends FSDataInputStream {
798    private final InputStream real;
799    private final AtomicBoolean isClosed = new AtomicBoolean(false);
800
801    public InputStreamProxy(InputStream real) {
802      super(real);
803      this.real = real;
804    }
805
806    @Override
807    public void close() throws IOException {
808      isClosed.set(true);
809      real.close();
810    }
811  }
812
813  /**
814   * A custom WALCellCodec in which we can inject failure.
815   */
816  @SuppressWarnings("unused")
817  private static class BrokenWALCellCodec extends WALCellCodec {
818    static final AtomicBoolean THROW_FAILURE_ON_INIT = new AtomicBoolean(false);
819
820    static void maybeInjectFailure() {
821      if (THROW_FAILURE_ON_INIT.get()) {
822        throw new RuntimeException("Injected instantiation exception");
823      }
824    }
825
826    public BrokenWALCellCodec() {
827      super();
828      maybeInjectFailure();
829    }
830
831    public BrokenWALCellCodec(Configuration conf, CompressionContext compression) {
832      super(conf, compression);
833      maybeInjectFailure();
834    }
835  }
836}