001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import static org.hamcrest.CoreMatchers.instanceOf;
021import static org.hamcrest.CoreMatchers.not;
022import static org.hamcrest.MatcherAssert.assertThat;
023import static org.junit.Assert.assertEquals;
024
025import java.io.IOException;
026import java.util.Optional;
027import java.util.function.BiPredicate;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtil;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
033import org.apache.hadoop.hbase.client.RegionInfo;
034import org.apache.hadoop.hbase.client.RegionInfoBuilder;
035import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
036import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
037import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
038import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogTestHelper;
039import org.apache.hadoop.hbase.replication.SyncReplicationState;
040import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider;
041import org.apache.hadoop.hbase.testclassification.MediumTests;
042import org.apache.hadoop.hbase.testclassification.RegionServerTests;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
045import org.apache.hadoop.hbase.util.Pair;
046import org.apache.hadoop.hdfs.DistributedFileSystem;
047import org.junit.AfterClass;
048import org.junit.BeforeClass;
049import org.junit.ClassRule;
050import org.junit.Test;
051import org.junit.experimental.categories.Category;
052
053@Category({ RegionServerTests.class, MediumTests.class })
054public class TestSyncReplicationWALProvider {
055
056  @ClassRule
057  public static final HBaseClassTestRule CLASS_RULE =
058    HBaseClassTestRule.forClass(TestSyncReplicationWALProvider.class);
059
060  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
061
062  private static String PEER_ID = "1";
063
064  private static String REMOTE_WAL_DIR = "/RemoteWAL";
065
066  private static TableName TABLE = TableName.valueOf("table");
067
068  private static TableName TABLE_NO_REP = TableName.valueOf("table-no-rep");
069
070  private static RegionInfo REGION = RegionInfoBuilder.newBuilder(TABLE).build();
071
072  private static RegionInfo REGION_NO_REP = RegionInfoBuilder.newBuilder(TABLE_NO_REP).build();
073
074  private static WALFactory FACTORY;
075
076  public static final class InfoProvider implements SyncReplicationPeerInfoProvider {
077
078    @Override
079    public Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(TableName table) {
080      if (table != null && table.equals(TABLE)) {
081        return Optional.of(Pair.newPair(PEER_ID, REMOTE_WAL_DIR));
082      } else {
083        return Optional.empty();
084      }
085    }
086
087    @Override
088    public boolean checkState(TableName table,
089        BiPredicate<SyncReplicationState, SyncReplicationState> checker) {
090      return false;
091    }
092  }
093
094  @BeforeClass
095  public static void setUpBeforeClass() throws Exception {
096    UTIL.startMiniDFSCluster(3);
097    FACTORY = new WALFactory(UTIL.getConfiguration(), "test");
098    ((SyncReplicationWALProvider) FACTORY.getWALProvider()).setPeerInfoProvider(new InfoProvider());
099    UTIL.getTestFileSystem().mkdirs(new Path(REMOTE_WAL_DIR, PEER_ID));
100  }
101
102  @AfterClass
103  public static void tearDownAfterClass() throws IOException {
104    FACTORY.close();
105    UTIL.shutdownMiniDFSCluster();
106  }
107
108  private void testReadWrite(DualAsyncFSWAL wal) throws Exception {
109    int recordCount = 100;
110    int columnCount = 10;
111    byte[] row = Bytes.toBytes("testRow");
112    long timestamp = EnvironmentEdgeManager.currentTime();
113    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
114    ProtobufLogTestHelper.doWrite(wal, REGION, TABLE, columnCount, recordCount, row, timestamp,
115      mvcc);
116    Path localFile = wal.getCurrentFileName();
117    Path remoteFile = new Path(REMOTE_WAL_DIR + "/" + PEER_ID, localFile.getName());
118    try (ProtobufLogReader reader =
119      (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), localFile)) {
120      ProtobufLogTestHelper.doRead(reader, false, REGION, TABLE, columnCount, recordCount, row,
121        timestamp);
122    }
123    try (ProtobufLogReader reader =
124      (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), remoteFile)) {
125      ProtobufLogTestHelper.doRead(reader, false, REGION, TABLE, columnCount, recordCount, row,
126        timestamp);
127    }
128    wal.rollWriter();
129    DistributedFileSystem dfs = (DistributedFileSystem) UTIL.getDFSCluster().getFileSystem();
130    UTIL.waitFor(5000, new ExplainingPredicate<Exception>() {
131
132      @Override
133      public boolean evaluate() throws Exception {
134        return dfs.isFileClosed(localFile) && dfs.isFileClosed(remoteFile);
135      }
136
137      @Override
138      public String explainFailure() throws Exception {
139        StringBuilder sb = new StringBuilder();
140        if (!dfs.isFileClosed(localFile)) {
141          sb.append(localFile + " has not been closed yet.");
142        }
143        if (!dfs.isFileClosed(remoteFile)) {
144          sb.append(remoteFile + " has not been closed yet.");
145        }
146        return sb.toString();
147      }
148    });
149    try (ProtobufLogReader reader =
150      (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), localFile)) {
151      ProtobufLogTestHelper.doRead(reader, true, REGION, TABLE, columnCount, recordCount, row,
152        timestamp);
153    }
154    try (ProtobufLogReader reader =
155      (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), remoteFile)) {
156      ProtobufLogTestHelper.doRead(reader, true, REGION, TABLE, columnCount, recordCount, row,
157        timestamp);
158    }
159  }
160
161  @Test
162  public void test() throws Exception {
163    WAL walNoRep = FACTORY.getWAL(REGION_NO_REP);
164    assertThat(walNoRep, not(instanceOf(DualAsyncFSWAL.class)));
165    DualAsyncFSWAL wal = (DualAsyncFSWAL) FACTORY.getWAL(REGION);
166    assertEquals(2, FACTORY.getWALs().size());
167    testReadWrite(wal);
168    SyncReplicationWALProvider walProvider = (SyncReplicationWALProvider) FACTORY.getWALProvider();
169    walProvider.peerSyncReplicationStateChange(PEER_ID, SyncReplicationState.ACTIVE,
170      SyncReplicationState.DOWNGRADE_ACTIVE, 1);
171    assertEquals(1, FACTORY.getWALs().size());
172  }
173}