001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.hamcrest.CoreMatchers.instanceOf;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertFalse;
024import static org.junit.Assert.assertTrue;
025import static org.junit.Assert.fail;
026
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.regex.Pattern;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.DoNotRetryIOException;
034import org.apache.hadoop.hbase.HBaseTestingUtil;
035import org.apache.hadoop.hbase.HBaseZKTestingUtil;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.StartTestingClusterOption;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
040import org.apache.hadoop.hbase.client.Admin;
041import org.apache.hadoop.hbase.client.AsyncClusterConnection;
042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
043import org.apache.hadoop.hbase.client.Get;
044import org.apache.hadoop.hbase.client.Put;
045import org.apache.hadoop.hbase.client.Table;
046import org.apache.hadoop.hbase.client.TableDescriptor;
047import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
048import org.apache.hadoop.hbase.master.MasterFileSystem;
049import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
050import org.apache.hadoop.hbase.regionserver.HRegion;
051import org.apache.hadoop.hbase.regionserver.HRegionServer;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.util.FutureUtils;
054import org.apache.hadoop.hbase.wal.WAL.Entry;
055import org.apache.hadoop.hbase.wal.WALEdit;
056import org.apache.hadoop.hbase.wal.WALKeyImpl;
057import org.apache.hadoop.ipc.RemoteException;
058import org.junit.AfterClass;
059import org.junit.BeforeClass;
060
061import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
062
063/**
064 * Base class for testing sync replication.
065 */
066public class SyncReplicationTestBase {
067
068  protected static final HBaseZKTestingUtil ZK_UTIL = new HBaseZKTestingUtil();
069
070  protected static final HBaseTestingUtil UTIL1 = new HBaseTestingUtil();
071
072  protected static final HBaseTestingUtil UTIL2 = new HBaseTestingUtil();
073
074  protected static TableName TABLE_NAME = TableName.valueOf("SyncRep");
075
076  protected static byte[] CF = Bytes.toBytes("cf");
077
078  protected static byte[] CQ = Bytes.toBytes("cq");
079
080  protected static String PEER_ID = "1";
081
082  protected static Path REMOTE_WAL_DIR1;
083
084  protected static Path REMOTE_WAL_DIR2;
085
086  protected static void initTestingUtility(HBaseTestingUtil util, String zkParent) {
087    util.setZkCluster(ZK_UTIL.getZkCluster());
088    Configuration conf = util.getConfiguration();
089    conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkParent);
090    conf.setInt("replication.source.size.capacity", 102400);
091    conf.setLong("replication.source.sleepforretries", 100);
092    conf.setInt("hbase.regionserver.maxlogs", 10);
093    conf.setLong("hbase.master.logcleaner.ttl", 10);
094    conf.setInt("zookeeper.recovery.retry", 1);
095    conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
096    conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
097    conf.setInt("replication.stats.thread.period.seconds", 5);
098    conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
099    conf.setLong("replication.sleep.before.failover", 2000);
100    conf.setInt("replication.source.maxretriesmultiplier", 10);
101    conf.setFloat("replication.source.ratio", 1.0f);
102    conf.setBoolean("replication.source.eof.autorecovery", true);
103  }
104
105  @BeforeClass
106  public static void setUp() throws Exception {
107    ZK_UTIL.startMiniZKCluster();
108    initTestingUtility(UTIL1, "/cluster1");
109    initTestingUtility(UTIL2, "/cluster2");
110    StartTestingClusterOption option =
111      StartTestingClusterOption.builder().numMasters(2).numRegionServers(3).numDataNodes(3).build();
112    UTIL1.startMiniCluster(option);
113    UTIL2.startMiniCluster(option);
114    TableDescriptor td =
115      TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily(ColumnFamilyDescriptorBuilder
116        .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build();
117    UTIL1.getAdmin().createTable(td);
118    UTIL2.getAdmin().createTable(td);
119    FileSystem fs1 = UTIL1.getTestFileSystem();
120    FileSystem fs2 = UTIL2.getTestFileSystem();
121    REMOTE_WAL_DIR1 =
122      new Path(UTIL1.getMiniHBaseCluster().getMaster().getMasterFileSystem().getWALRootDir(),
123        "remoteWALs").makeQualified(fs1.getUri(), fs1.getWorkingDirectory());
124    REMOTE_WAL_DIR2 =
125      new Path(UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem().getWALRootDir(),
126        "remoteWALs").makeQualified(fs2.getUri(), fs2.getWorkingDirectory());
127    UTIL1.getAdmin().addReplicationPeer(PEER_ID,
128      ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getRpcConnnectionURI())
129        .setReplicateAllUserTables(false)
130        .setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>()))
131        .setRemoteWALDir(REMOTE_WAL_DIR2.toUri().toString()).build());
132    UTIL2.getAdmin().addReplicationPeer(PEER_ID,
133      ReplicationPeerConfig.newBuilder().setClusterKey(UTIL1.getRpcConnnectionURI())
134        .setReplicateAllUserTables(false)
135        .setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>()))
136        .setRemoteWALDir(REMOTE_WAL_DIR1.toUri().toString()).build());
137  }
138
139  private static void shutdown(HBaseTestingUtil util) throws Exception {
140    if (util.getHBaseCluster() == null) {
141      return;
142    }
143    Admin admin = util.getAdmin();
144    if (!admin.listReplicationPeers(Pattern.compile(PEER_ID)).isEmpty()) {
145      if (
146        admin.getReplicationPeerSyncReplicationState(PEER_ID)
147            != SyncReplicationState.DOWNGRADE_ACTIVE
148      ) {
149        admin.transitReplicationPeerSyncReplicationState(PEER_ID,
150          SyncReplicationState.DOWNGRADE_ACTIVE);
151      }
152      admin.removeReplicationPeer(PEER_ID);
153    }
154    util.shutdownMiniCluster();
155  }
156
157  @AfterClass
158  public static void tearDown() throws Exception {
159    shutdown(UTIL1);
160    shutdown(UTIL2);
161    ZK_UTIL.shutdownMiniZKCluster();
162  }
163
164  protected final void write(HBaseTestingUtil util, int start, int end) throws IOException {
165    try (Table table = util.getConnection().getTable(TABLE_NAME)) {
166      for (int i = start; i < end; i++) {
167        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
168      }
169    }
170  }
171
172  protected final void verify(HBaseTestingUtil util, int start, int end) throws IOException {
173    try (Table table = util.getConnection().getTable(TABLE_NAME)) {
174      for (int i = start; i < end; i++) {
175        assertEquals(i, Bytes.toInt(table.get(new Get(Bytes.toBytes(i))).getValue(CF, CQ)));
176      }
177    }
178  }
179
180  protected final void verifyThroughRegion(HBaseTestingUtil util, int start, int end)
181    throws IOException {
182    HRegion region = util.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
183    for (int i = start; i < end; i++) {
184      assertEquals(i, Bytes.toInt(region.get(new Get(Bytes.toBytes(i))).getValue(CF, CQ)));
185    }
186  }
187
188  protected final void verifyNotReplicatedThroughRegion(HBaseTestingUtil util, int start, int end)
189    throws IOException {
190    HRegion region = util.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
191    for (int i = start; i < end; i++) {
192      assertTrue(region.get(new Get(Bytes.toBytes(i))).isEmpty());
193    }
194  }
195
196  protected final void waitUntilReplicationDone(HBaseTestingUtil util, int end) throws Exception {
197    // The reject check is in RSRpcService so we can still read through HRegion
198    HRegion region = util.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
199    util.waitFor(30000, new ExplainingPredicate<Exception>() {
200
201      @Override
202      public boolean evaluate() throws Exception {
203        return !region.get(new Get(Bytes.toBytes(end - 1))).isEmpty();
204      }
205
206      @Override
207      public String explainFailure() throws Exception {
208        return "Replication has not been catched up yet";
209      }
210    });
211  }
212
213  protected final void writeAndVerifyReplication(HBaseTestingUtil util1, HBaseTestingUtil util2,
214    int start, int end) throws Exception {
215    write(util1, start, end);
216    waitUntilReplicationDone(util2, end);
217    verifyThroughRegion(util2, start, end);
218  }
219
220  protected final Path getRemoteWALDir(MasterFileSystem mfs, String peerId) {
221    Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME);
222    return getRemoteWALDir(remoteWALDir, peerId);
223  }
224
225  protected final Path getRemoteWALDir(Path remoteWALDir, String peerId) {
226    return new Path(remoteWALDir, peerId);
227  }
228
229  protected final Path getReplayRemoteWALs(Path remoteWALDir, String peerId) {
230    return new Path(remoteWALDir, peerId + "-replay");
231  }
232
233  protected final void verifyRemovedPeer(String peerId, Path remoteWALDir, HBaseTestingUtil utility)
234    throws Exception {
235    ReplicationPeerStorage rps = ReplicationStorageFactory.getReplicationPeerStorage(
236      utility.getTestFileSystem(), utility.getZooKeeperWatcher(), utility.getConfiguration());
237    try {
238      rps.getPeerSyncReplicationState(peerId);
239      fail("Should throw exception when get the sync replication state of a removed peer.");
240    } catch (ReplicationException e) {
241      // ignore.
242    }
243    try {
244      rps.getPeerNewSyncReplicationState(peerId);
245      fail("Should throw exception when get the new sync replication state of a removed peer");
246    } catch (ReplicationException e) {
247      // ignore.
248    }
249    try (FileSystem fs = utility.getTestFileSystem()) {
250      assertFalse(fs.exists(getRemoteWALDir(remoteWALDir, peerId)));
251      assertFalse(fs.exists(getReplayRemoteWALs(remoteWALDir, peerId)));
252    }
253  }
254
255  private void assertRejection(Throwable error) {
256    assertThat(error, instanceOf(DoNotRetryIOException.class));
257    assertTrue(error.getMessage().contains("Reject to apply to sink cluster"));
258    assertTrue(error.getMessage().contains(TABLE_NAME.toString()));
259  }
260
261  protected final void verifyReplicationRequestRejection(HBaseTestingUtil utility,
262    boolean expectedRejection) throws Exception {
263    HRegionServer regionServer = utility.getRSForFirstRegionInTable(TABLE_NAME);
264    AsyncClusterConnection connection = regionServer.getAsyncClusterConnection();
265    Entry[] entries = new Entry[10];
266    for (int i = 0; i < entries.length; i++) {
267      entries[i] =
268        new Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TABLE_NAME, 0), new WALEdit());
269    }
270    if (!expectedRejection) {
271      FutureUtils.get(ReplicationProtobufUtil.replicateWALEntry(
272        connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null,
273        HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT));
274    } else {
275      try {
276        FutureUtils.get(ReplicationProtobufUtil.replicateWALEntry(
277          connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null,
278          HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT));
279        fail("Should throw IOException when sync-replication state is in A or DA");
280      } catch (RemoteException e) {
281        assertRejection(e.unwrapRemoteException());
282      } catch (DoNotRetryIOException e) {
283        assertRejection(e);
284      }
285    }
286  }
287
288  protected final void waitUntilDeleted(HBaseTestingUtil util, Path remoteWAL) throws Exception {
289    MasterFileSystem mfs = util.getMiniHBaseCluster().getMaster().getMasterFileSystem();
290    util.waitFor(30000, new ExplainingPredicate<Exception>() {
291
292      @Override
293      public boolean evaluate() throws Exception {
294        return !mfs.getWALFileSystem().exists(remoteWAL);
295      }
296
297      @Override
298      public String explainFailure() throws Exception {
299        return remoteWAL + " has not been deleted yet";
300      }
301    });
302  }
303}