001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import static org.apache.hadoop.hbase.wal.WALFactory.META_WAL_PROVIDER;
021import static org.apache.hadoop.hbase.wal.WALFactory.WAL_PROVIDER;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertFalse;
024import static org.junit.Assert.assertNotNull;
025import static org.junit.Assert.assertTrue;
026import static org.junit.Assert.fail;
027
028import java.io.IOException;
029import java.lang.reflect.Method;
030import java.net.BindException;
031import java.util.List;
032import java.util.NavigableMap;
033import java.util.TreeMap;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FSDataInputStream;
036import org.apache.hadoop.fs.FSDataOutputStream;
037import org.apache.hadoop.fs.FileStatus;
038import org.apache.hadoop.fs.FileSystem;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.hbase.Cell;
041import org.apache.hadoop.hbase.CellUtil;
042import org.apache.hadoop.hbase.Coprocessor;
043import org.apache.hadoop.hbase.HBaseClassTestRule;
044import org.apache.hadoop.hbase.HBaseTestingUtility;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.KeyValue;
047import org.apache.hadoop.hbase.ServerName;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
050import org.apache.hadoop.hbase.client.RegionInfo;
051import org.apache.hadoop.hbase.client.RegionInfoBuilder;
052import org.apache.hadoop.hbase.client.TableDescriptor;
053import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
054import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
055import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor;
056import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
057import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
058import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
059import org.apache.hadoop.hbase.testclassification.MediumTests;
060import org.apache.hadoop.hbase.testclassification.RegionServerTests;
061import org.apache.hadoop.hbase.util.Bytes;
062import org.apache.hadoop.hbase.util.FSUtils;
063import org.apache.hadoop.hbase.util.Threads;
064import org.apache.hadoop.hbase.wal.WALFactory.Providers;
065import org.apache.hadoop.hdfs.DistributedFileSystem;
066import org.apache.hadoop.hdfs.MiniDFSCluster;
067import org.apache.hadoop.hdfs.protocol.HdfsConstants;
068import org.junit.After;
069import org.junit.AfterClass;
070import org.junit.Before;
071import org.junit.BeforeClass;
072import org.junit.ClassRule;
073import org.junit.Rule;
074import org.junit.Test;
075import org.junit.experimental.categories.Category;
076import org.junit.rules.TestName;
077import org.slf4j.Logger;
078import org.slf4j.LoggerFactory;
079
080/**
081 * WAL tests that can be reused across providers.
082 */
083@Category({RegionServerTests.class, MediumTests.class})
084public class TestWALFactory {
085
086  @ClassRule
087  public static final HBaseClassTestRule CLASS_RULE =
088      HBaseClassTestRule.forClass(TestWALFactory.class);
089
090  private static final Logger LOG = LoggerFactory.getLogger(TestWALFactory.class);
091
092  protected static Configuration conf;
093  private static MiniDFSCluster cluster;
094  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
095  protected static Path hbaseDir;
096  protected static Path hbaseWALDir;
097
098  protected FileSystem fs;
099  protected Path dir;
100  protected WALFactory wals;
101  private ServerName currentServername;
102
103  @Rule
104  public final TestName currentTest = new TestName();
105
106  @Before
107  public void setUp() throws Exception {
108    fs = cluster.getFileSystem();
109    dir = new Path(hbaseDir, currentTest.getMethodName());
110    this.currentServername = ServerName.valueOf(currentTest.getMethodName(), 16010, 1);
111    wals = new WALFactory(conf, this.currentServername.toString());
112  }
113
114  @After
115  public void tearDown() throws Exception {
116    // testAppendClose closes the FileSystem, which will prevent us from closing cleanly here.
117    try {
118      wals.close();
119    } catch (IOException exception) {
120      LOG.warn("Encountered exception while closing wal factory. If you have other errors, this" +
121          " may be the cause. Message: " + exception);
122      LOG.debug("Exception details for failure to close wal factory.", exception);
123    }
124    FileStatus[] entries = fs.listStatus(new Path("/"));
125    for (FileStatus dir : entries) {
126      fs.delete(dir.getPath(), true);
127    }
128  }
129
130  @BeforeClass
131  public static void setUpBeforeClass() throws Exception {
132    // Make block sizes small.
133    TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
134    // needed for testAppendClose()
135    // quicker heartbeat interval for faster DN death notification
136    TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
137    TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
138    TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000);
139
140    // faster failover with cluster.shutdown();fs.close() idiom
141    TEST_UTIL.getConfiguration()
142        .setInt("hbase.ipc.client.connect.max.retries", 1);
143    TEST_UTIL.getConfiguration().setInt(
144        "dfs.client.block.recovery.retries", 1);
145    TEST_UTIL.getConfiguration().setInt(
146      "hbase.ipc.client.connection.maxidletime", 500);
147    TEST_UTIL.getConfiguration().setInt("hbase.lease.recovery.timeout", 10000);
148    TEST_UTIL.getConfiguration().setInt("hbase.lease.recovery.dfs.timeout", 1000);
149    TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
150        SampleRegionWALCoprocessor.class.getName());
151    TEST_UTIL.startMiniDFSCluster(3);
152
153    conf = TEST_UTIL.getConfiguration();
154    cluster = TEST_UTIL.getDFSCluster();
155
156    hbaseDir = TEST_UTIL.createRootDir();
157    hbaseWALDir = TEST_UTIL.createWALRootDir();
158  }
159
160  @AfterClass
161  public static void tearDownAfterClass() throws Exception {
162    TEST_UTIL.shutdownMiniCluster();
163  }
164
165  @Test
166  public void canCloseSingleton() throws IOException {
167    WALFactory.getInstance(conf).close();
168  }
169
170  /**
171   * Just write multiple logs then split.  Before fix for HADOOP-2283, this
172   * would fail.
173   * @throws IOException
174   */
175  @Test
176  public void testSplit() throws IOException {
177    final TableName tableName = TableName.valueOf(currentTest.getMethodName());
178    final byte [] rowName = tableName.getName();
179    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
180    final int howmany = 3;
181    RegionInfo[] infos = new RegionInfo[3];
182    Path tabledir = FSUtils.getWALTableDir(conf, tableName);
183    fs.mkdirs(tabledir);
184    for (int i = 0; i < howmany; i++) {
185      infos[i] = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("" + i))
186          .setEndKey(Bytes.toBytes("" + (i + 1))).build();
187      fs.mkdirs(new Path(tabledir, infos[i].getEncodedName()));
188      LOG.info("allo " + new Path(tabledir, infos[i].getEncodedName()).toString());
189    }
190    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
191    scopes.put(Bytes.toBytes("column"), 0);
192
193
194    // Add edits for three regions.
195    for (int ii = 0; ii < howmany; ii++) {
196      for (int i = 0; i < howmany; i++) {
197        final WAL log =
198            wals.getWAL(infos[i]);
199        for (int j = 0; j < howmany; j++) {
200          WALEdit edit = new WALEdit();
201          byte [] family = Bytes.toBytes("column");
202          byte [] qualifier = Bytes.toBytes(Integer.toString(j));
203          byte [] column = Bytes.toBytes("column:" + Integer.toString(j));
204          edit.add(new KeyValue(rowName, family, qualifier,
205              System.currentTimeMillis(), column));
206          LOG.info("Region " + i + ": " + edit);
207          WALKeyImpl walKey =  new WALKeyImpl(infos[i].getEncodedNameAsBytes(), tableName,
208              System.currentTimeMillis(), mvcc, scopes);
209          log.append(infos[i], walKey, edit, true);
210          walKey.getWriteEntry();
211        }
212        log.sync();
213        log.rollWriter(true);
214      }
215    }
216    wals.shutdown();
217    // The below calculation of logDir relies on insider information... WALSplitter should be connected better
218    // with the WAL system.... not requiring explicit path. The oldLogDir is just made up not used.
219    Path logDir =
220        new Path(new Path(hbaseWALDir, HConstants.HREGION_LOGDIR_NAME),
221            this.currentServername.toString());
222    Path oldLogDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
223    List<Path> splits = WALSplitter.split(hbaseWALDir, logDir, oldLogDir, fs, conf, wals);
224    verifySplits(splits, howmany);
225  }
226
227  /**
228   * Test new HDFS-265 sync.
229   * @throws Exception
230   */
231  @Test
232  public void Broken_testSync() throws Exception {
233    TableName tableName = TableName.valueOf(currentTest.getMethodName());
234    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
235    // First verify that using streams all works.
236    Path p = new Path(dir, currentTest.getMethodName() + ".fsdos");
237    FSDataOutputStream out = fs.create(p);
238    out.write(tableName.getName());
239    Method syncMethod = null;
240    try {
241      syncMethod = out.getClass().getMethod("hflush", new Class<?> []{});
242    } catch (NoSuchMethodException e) {
243      try {
244        syncMethod = out.getClass().getMethod("sync", new Class<?> []{});
245      } catch (NoSuchMethodException ex) {
246        fail("This version of Hadoop supports neither Syncable.sync() " +
247            "nor Syncable.hflush().");
248      }
249    }
250    syncMethod.invoke(out, new Object[]{});
251    FSDataInputStream in = fs.open(p);
252    assertTrue(in.available() > 0);
253    byte [] buffer = new byte [1024];
254    int read = in.read(buffer);
255    assertEquals(tableName.getName().length, read);
256    out.close();
257    in.close();
258
259    final int total = 20;
260    WAL.Reader reader = null;
261
262    try {
263      RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
264      NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
265      scopes.put(tableName.getName(), 0);
266      final WAL wal = wals.getWAL(info);
267
268      for (int i = 0; i < total; i++) {
269        WALEdit kvs = new WALEdit();
270        kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
271        wal.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
272            System.currentTimeMillis(), mvcc, scopes), kvs, true);
273      }
274      // Now call sync and try reading.  Opening a Reader before you sync just
275      // gives you EOFE.
276      wal.sync();
277      // Open a Reader.
278      Path walPath = AbstractFSWALProvider.getCurrentFileName(wal);
279      reader = wals.createReader(fs, walPath);
280      int count = 0;
281      WAL.Entry entry = new WAL.Entry();
282      while ((entry = reader.next(entry)) != null) count++;
283      assertEquals(total, count);
284      reader.close();
285      // Add test that checks to see that an open of a Reader works on a file
286      // that has had a sync done on it.
287      for (int i = 0; i < total; i++) {
288        WALEdit kvs = new WALEdit();
289        kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
290        wal.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
291            System.currentTimeMillis(), mvcc, scopes), kvs, true);
292      }
293      wal.sync();
294      reader = wals.createReader(fs, walPath);
295      count = 0;
296      while((entry = reader.next(entry)) != null) count++;
297      assertTrue(count >= total);
298      reader.close();
299      // If I sync, should see double the edits.
300      wal.sync();
301      reader = wals.createReader(fs, walPath);
302      count = 0;
303      while((entry = reader.next(entry)) != null) count++;
304      assertEquals(total * 2, count);
305      reader.close();
306      // Now do a test that ensures stuff works when we go over block boundary,
307      // especially that we return good length on file.
308      final byte [] value = new byte[1025 * 1024];  // Make a 1M value.
309      for (int i = 0; i < total; i++) {
310        WALEdit kvs = new WALEdit();
311        kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), value));
312        wal.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
313            System.currentTimeMillis(), mvcc, scopes), kvs,  true);
314      }
315      // Now I should have written out lots of blocks.  Sync then read.
316      wal.sync();
317      reader = wals.createReader(fs, walPath);
318      count = 0;
319      while((entry = reader.next(entry)) != null) count++;
320      assertEquals(total * 3, count);
321      reader.close();
322      // shutdown and ensure that Reader gets right length also.
323      wal.shutdown();
324      reader = wals.createReader(fs, walPath);
325      count = 0;
326      while((entry = reader.next(entry)) != null) count++;
327      assertEquals(total * 3, count);
328      reader.close();
329    } finally {
330      if (reader != null) reader.close();
331    }
332  }
333
334  private void verifySplits(final List<Path> splits, final int howmany)
335  throws IOException {
336    assertEquals(howmany * howmany, splits.size());
337    for (int i = 0; i < splits.size(); i++) {
338      LOG.info("Verifying=" + splits.get(i));
339      WAL.Reader reader = wals.createReader(fs, splits.get(i));
340      try {
341        int count = 0;
342        String previousRegion = null;
343        long seqno = -1;
344        WAL.Entry entry = new WAL.Entry();
345        while((entry = reader.next(entry)) != null) {
346          WALKey key = entry.getKey();
347          String region = Bytes.toString(key.getEncodedRegionName());
348          // Assert that all edits are for same region.
349          if (previousRegion != null) {
350            assertEquals(previousRegion, region);
351          }
352          LOG.info("oldseqno=" + seqno + ", newseqno=" + key.getSequenceId());
353          assertTrue(seqno < key.getSequenceId());
354          seqno = key.getSequenceId();
355          previousRegion = region;
356          count++;
357        }
358        assertEquals(howmany, count);
359      } finally {
360        reader.close();
361      }
362    }
363  }
364
365  /*
366   * We pass different values to recoverFileLease() so that different code paths are covered
367   *
368   * For this test to pass, requires:
369   * 1. HDFS-200 (append support)
370   * 2. HDFS-988 (SafeMode should freeze file operations
371   *              [FSNamesystem.nextGenerationStampForBlock])
372   * 3. HDFS-142 (on restart, maintain pendingCreates)
373   */
374  @Test
375  public void testAppendClose() throws Exception {
376    TableName tableName =
377        TableName.valueOf(currentTest.getMethodName());
378    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).build();
379
380    WAL wal = wals.getWAL(regionInfo);
381    int total = 20;
382
383    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
384    scopes.put(tableName.getName(), 0);
385    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
386    for (int i = 0; i < total; i++) {
387      WALEdit kvs = new WALEdit();
388      kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
389      wal.append(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName,
390          System.currentTimeMillis(), mvcc, scopes),
391        kvs, true);
392    }
393    // Now call sync to send the data to HDFS datanodes
394    wal.sync();
395     int namenodePort = cluster.getNameNodePort();
396    final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal);
397
398
399    // Stop the cluster.  (ensure restart since we're sharing MiniDFSCluster)
400    try {
401      DistributedFileSystem dfs = cluster.getFileSystem();
402      dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
403      TEST_UTIL.shutdownMiniDFSCluster();
404      try {
405        // wal.writer.close() will throw an exception,
406        // but still call this since it closes the LogSyncer thread first
407        wal.shutdown();
408      } catch (IOException e) {
409        LOG.info(e.toString(), e);
410      }
411      fs.close(); // closing FS last so DFSOutputStream can't call close
412      LOG.info("STOPPED first instance of the cluster");
413    } finally {
414      // Restart the cluster
415      while (cluster.isClusterUp()){
416        LOG.error("Waiting for cluster to go down");
417        Thread.sleep(1000);
418      }
419      assertFalse(cluster.isClusterUp());
420      cluster = null;
421      for (int i = 0; i < 100; i++) {
422        try {
423          cluster = TEST_UTIL.startMiniDFSClusterForTestWAL(namenodePort);
424          break;
425        } catch (BindException e) {
426          LOG.info("Sleeping.  BindException bringing up new cluster");
427          Threads.sleep(1000);
428        }
429      }
430      cluster.waitActive();
431      fs = cluster.getFileSystem();
432      LOG.info("STARTED second instance.");
433    }
434
435    // set the lease period to be 1 second so that the
436    // namenode triggers lease recovery upon append request
437    Method setLeasePeriod = cluster.getClass()
438      .getDeclaredMethod("setLeasePeriod", new Class[]{Long.TYPE, Long.TYPE});
439    setLeasePeriod.setAccessible(true);
440    setLeasePeriod.invoke(cluster, 1000L, 1000L);
441    try {
442      Thread.sleep(1000);
443    } catch (InterruptedException e) {
444      LOG.info(e.toString(), e);
445    }
446
447    // Now try recovering the log, like the HMaster would do
448    final FileSystem recoveredFs = fs;
449    final Configuration rlConf = conf;
450
451    class RecoverLogThread extends Thread {
452      public Exception exception = null;
453      @Override
454      public void run() {
455          try {
456            FSUtils.getInstance(fs, rlConf)
457              .recoverFileLease(recoveredFs, walPath, rlConf, null);
458          } catch (IOException e) {
459            exception = e;
460          }
461      }
462    }
463
464    RecoverLogThread t = new RecoverLogThread();
465    t.start();
466    // Timeout after 60 sec. Without correct patches, would be an infinite loop
467    t.join(60 * 1000);
468    if(t.isAlive()) {
469      t.interrupt();
470      throw new Exception("Timed out waiting for WAL.recoverLog()");
471    }
472
473    if (t.exception != null)
474      throw t.exception;
475
476    // Make sure you can read all the content
477    WAL.Reader reader = wals.createReader(fs, walPath);
478    int count = 0;
479    WAL.Entry entry = new WAL.Entry();
480    while (reader.next(entry) != null) {
481      count++;
482      assertTrue("Should be one KeyValue per WALEdit",
483                  entry.getEdit().getCells().size() == 1);
484    }
485    assertEquals(total, count);
486    reader.close();
487
488    // Reset the lease period
489    setLeasePeriod.invoke(cluster, new Object[]{ 60000L, 3600000L });
490  }
491
492  /**
493   * Tests that we can write out an edit, close, and then read it back in again.
494   */
495  @Test
496  public void testEditAdd() throws IOException {
497    int colCount = 10;
498    TableDescriptor htd =
499        TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName()))
500            .setColumnFamily(ColumnFamilyDescriptorBuilder.of("column")).build();
501    NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
502    for (byte[] fam : htd.getColumnFamilyNames()) {
503      scopes.put(fam, 0);
504    }
505    byte[] row = Bytes.toBytes("row");
506    WAL.Reader reader = null;
507    try {
508      final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
509
510      // Write columns named 1, 2, 3, etc. and then values of single byte
511      // 1, 2, 3...
512      long timestamp = System.currentTimeMillis();
513      WALEdit cols = new WALEdit();
514      for (int i = 0; i < colCount; i++) {
515        cols.add(new KeyValue(row, Bytes.toBytes("column"),
516            Bytes.toBytes(Integer.toString(i)),
517          timestamp, new byte[] { (byte)(i + '0') }));
518      }
519      RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(row)
520          .setEndKey(Bytes.toBytes(Bytes.toString(row) + "1")).build();
521      final WAL log = wals.getWAL(info);
522
523      final long txid = log.append(info,
524        new WALKeyImpl(info.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis(),
525            mvcc, scopes),
526        cols, true);
527      log.sync(txid);
528      log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
529      log.completeCacheFlush(info.getEncodedNameAsBytes());
530      log.shutdown();
531      Path filename = AbstractFSWALProvider.getCurrentFileName(log);
532      // Now open a reader on the log and assert append worked.
533      reader = wals.createReader(fs, filename);
534      // Above we added all columns on a single row so we only read one
535      // entry in the below... thats why we have '1'.
536      for (int i = 0; i < 1; i++) {
537        WAL.Entry entry = reader.next(null);
538        if (entry == null) break;
539        WALKey key = entry.getKey();
540        WALEdit val = entry.getEdit();
541        assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName()));
542        assertTrue(htd.getTableName().equals(key.getTableName()));
543        Cell cell = val.getCells().get(0);
544        assertTrue(Bytes.equals(row, 0, row.length, cell.getRowArray(), cell.getRowOffset(),
545          cell.getRowLength()));
546        assertEquals((byte)(i + '0'), CellUtil.cloneValue(cell)[0]);
547        System.out.println(key + " " + val);
548      }
549    } finally {
550      if (reader != null) {
551        reader.close();
552      }
553    }
554  }
555
556  @Test
557  public void testAppend() throws IOException {
558    int colCount = 10;
559    TableDescriptor htd =
560        TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName()))
561            .setColumnFamily(ColumnFamilyDescriptorBuilder.of("column")).build();
562    NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
563    for (byte[] fam : htd.getColumnFamilyNames()) {
564      scopes.put(fam, 0);
565    }
566    byte[] row = Bytes.toBytes("row");
567    WAL.Reader reader = null;
568    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
569    try {
570      // Write columns named 1, 2, 3, etc. and then values of single byte
571      // 1, 2, 3...
572      long timestamp = System.currentTimeMillis();
573      WALEdit cols = new WALEdit();
574      for (int i = 0; i < colCount; i++) {
575        cols.add(new KeyValue(row, Bytes.toBytes("column"),
576          Bytes.toBytes(Integer.toString(i)),
577          timestamp, new byte[] { (byte)(i + '0') }));
578      }
579      RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
580      final WAL log = wals.getWAL(hri);
581      final long txid = log.append(hri,
582        new WALKeyImpl(hri.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis(),
583            mvcc, scopes),
584        cols, true);
585      log.sync(txid);
586      log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
587      log.completeCacheFlush(hri.getEncodedNameAsBytes());
588      log.shutdown();
589      Path filename = AbstractFSWALProvider.getCurrentFileName(log);
590      // Now open a reader on the log and assert append worked.
591      reader = wals.createReader(fs, filename);
592      WAL.Entry entry = reader.next();
593      assertEquals(colCount, entry.getEdit().size());
594      int idx = 0;
595      for (Cell val : entry.getEdit().getCells()) {
596        assertTrue(Bytes.equals(hri.getEncodedNameAsBytes(),
597          entry.getKey().getEncodedRegionName()));
598        assertTrue(htd.getTableName().equals(entry.getKey().getTableName()));
599        assertTrue(Bytes.equals(row, 0, row.length, val.getRowArray(), val.getRowOffset(),
600          val.getRowLength()));
601        assertEquals((byte) (idx + '0'), CellUtil.cloneValue(val)[0]);
602        System.out.println(entry.getKey() + " " + val);
603        idx++;
604      }
605    } finally {
606      if (reader != null) {
607        reader.close();
608      }
609    }
610  }
611
612  /**
613   * Test that we can visit entries before they are appended
614   * @throws Exception
615   */
616  @Test
617  public void testVisitors() throws Exception {
618    final int COL_COUNT = 10;
619    final TableName tableName = TableName.valueOf(currentTest.getMethodName());
620    final byte [] row = Bytes.toBytes("row");
621    final DumbWALActionsListener visitor = new DumbWALActionsListener();
622    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
623    long timestamp = System.currentTimeMillis();
624    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
625    scopes.put(Bytes.toBytes("column"), 0);
626
627    RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).build();
628    final WAL log = wals.getWAL(hri);
629    log.registerWALActionsListener(visitor);
630    for (int i = 0; i < COL_COUNT; i++) {
631      WALEdit cols = new WALEdit();
632      cols.add(new KeyValue(row, Bytes.toBytes("column"),
633          Bytes.toBytes(Integer.toString(i)),
634          timestamp, new byte[]{(byte) (i + '0')}));
635      log.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName,
636          System.currentTimeMillis(), mvcc, scopes), cols, true);
637    }
638    log.sync();
639    assertEquals(COL_COUNT, visitor.increments);
640    log.unregisterWALActionsListener(visitor);
641    WALEdit cols = new WALEdit();
642    cols.add(new KeyValue(row, Bytes.toBytes("column"),
643        Bytes.toBytes(Integer.toString(11)),
644        timestamp, new byte[]{(byte) (11 + '0')}));
645    log.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName,
646        System.currentTimeMillis(), mvcc, scopes), cols, true);
647    log.sync();
648    assertEquals(COL_COUNT, visitor.increments);
649  }
650
651  /**
652   * A loaded WAL coprocessor won't break existing WAL test cases.
653   */
654  @Test
655  public void testWALCoprocessorLoaded() throws Exception {
656    // test to see whether the coprocessor is loaded or not.
657    WALCoprocessorHost host = wals.getWAL(null).getCoprocessorHost();
658    Coprocessor c = host.findCoprocessor(SampleRegionWALCoprocessor.class);
659    assertNotNull(c);
660  }
661
662  static class DumbWALActionsListener implements WALActionsListener {
663    int increments = 0;
664
665    @Override
666    public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) {
667      increments++;
668    }
669
670    @Override
671    public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) {
672      // To change body of implemented methods use File | Settings | File
673      // Templates.
674      increments++;
675    }
676  }
677
678  @Test
679  public void testWALProviders() throws IOException {
680    Configuration conf = new Configuration();
681    WALFactory walFactory = new WALFactory(conf, this.currentServername.toString());
682    assertEquals(walFactory.getWALProvider().getClass(), walFactory.getMetaProvider().getClass());
683  }
684
685  @Test
686  public void testOnlySetWALProvider() throws IOException {
687    Configuration conf = new Configuration();
688    conf.set(WAL_PROVIDER, WALFactory.Providers.multiwal.name());
689    WALFactory walFactory = new WALFactory(conf, this.currentServername.toString());
690
691    assertEquals(WALFactory.Providers.multiwal.clazz, walFactory.getWALProvider().getClass());
692    assertEquals(WALFactory.Providers.multiwal.clazz, walFactory.getMetaProvider().getClass());
693  }
694
695  @Test
696  public void testOnlySetMetaWALProvider() throws IOException {
697    Configuration conf = new Configuration();
698    conf.set(META_WAL_PROVIDER, WALFactory.Providers.asyncfs.name());
699    WALFactory walFactory = new WALFactory(conf, this.currentServername.toString());
700
701    assertEquals(WALFactory.Providers.defaultProvider.clazz,
702        walFactory.getWALProvider().getClass());
703    assertEquals(WALFactory.Providers.asyncfs.clazz, walFactory.getMetaProvider().getClass());
704  }
705
706  @Test
707  public void testDefaultProvider() throws IOException {
708    final Configuration conf = new Configuration();
709    // AsyncFSWal is the default, we should be able to request any WAL.
710    final WALFactory normalWalFactory = new WALFactory(conf, this.currentServername.toString());
711    Class<? extends WALProvider> fshLogProvider = normalWalFactory.getProviderClass(
712        WALFactory.WAL_PROVIDER, Providers.filesystem.name());
713    assertEquals(Providers.filesystem.clazz, fshLogProvider);
714
715    // Imagine a world where MultiWAL is the default
716    final WALFactory customizedWalFactory = new WALFactory(
717        conf, this.currentServername.toString())  {
718      @Override
719      Providers getDefaultProvider() {
720        return Providers.multiwal;
721      }
722    };
723    // If we don't specify a WALProvider, we should get the default implementation.
724    Class<? extends WALProvider> multiwalProviderClass = customizedWalFactory.getProviderClass(
725        WALFactory.WAL_PROVIDER, Providers.multiwal.name());
726    assertEquals(Providers.multiwal.clazz, multiwalProviderClass);
727  }
728}