001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.hamcrest.CoreMatchers.instanceOf;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.junit.jupiter.api.Assertions.assertEquals;
023import static org.junit.jupiter.api.Assertions.assertFalse;
024import static org.junit.jupiter.api.Assertions.assertTrue;
025import static org.junit.jupiter.api.Assertions.fail;
026
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.regex.Pattern;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.DoNotRetryIOException;
034import org.apache.hadoop.hbase.HBaseTestingUtil;
035import org.apache.hadoop.hbase.HBaseZKTestingUtil;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.StartTestingClusterOption;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
040import org.apache.hadoop.hbase.client.Admin;
041import org.apache.hadoop.hbase.client.AsyncClusterConnection;
042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
043import org.apache.hadoop.hbase.client.Get;
044import org.apache.hadoop.hbase.client.Put;
045import org.apache.hadoop.hbase.client.Table;
046import org.apache.hadoop.hbase.client.TableDescriptor;
047import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
048import org.apache.hadoop.hbase.master.MasterFileSystem;
049import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
050import org.apache.hadoop.hbase.regionserver.HRegion;
051import org.apache.hadoop.hbase.regionserver.HRegionServer;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.util.FutureUtils;
054import org.apache.hadoop.hbase.wal.WAL.Entry;
055import org.apache.hadoop.hbase.wal.WALEdit;
056import org.apache.hadoop.hbase.wal.WALKeyImpl;
057import org.apache.hadoop.ipc.RemoteException;
058import org.junit.jupiter.api.AfterAll;
059
060import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
061
062/**
063 * Sync replication test base without BeforeAll method.
064 * @see SyncReplicationTestBase
065 */
066public class SyncReplicationTestBaseNoBeforeAll {
067
068  protected static final HBaseZKTestingUtil ZK_UTIL = new HBaseZKTestingUtil();
069
070  protected static final HBaseTestingUtil UTIL1 = new HBaseTestingUtil();
071
072  protected static final HBaseTestingUtil UTIL2 = new HBaseTestingUtil();
073
074  protected static TableName TABLE_NAME = TableName.valueOf("SyncRep");
075
076  protected static byte[] CF = Bytes.toBytes("cf");
077
078  protected static byte[] CQ = Bytes.toBytes("cq");
079
080  protected static String PEER_ID = "1";
081
082  protected static Path REMOTE_WAL_DIR1;
083
084  protected static Path REMOTE_WAL_DIR2;
085
086  protected static void initTestingUtility(HBaseTestingUtil util, String zkParent) {
087    util.setZkCluster(ZK_UTIL.getZkCluster());
088    Configuration conf = util.getConfiguration();
089    conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkParent);
090    conf.setInt("replication.source.size.capacity", 102400);
091    conf.setLong("replication.source.sleepforretries", 100);
092    conf.setInt("hbase.regionserver.maxlogs", 10);
093    conf.setLong("hbase.master.logcleaner.ttl", 10);
094    conf.setInt("zookeeper.recovery.retry", 1);
095    conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
096    conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
097    conf.setInt("replication.stats.thread.period.seconds", 5);
098    conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
099    conf.setLong("replication.sleep.before.failover", 2000);
100    conf.setInt("replication.source.maxretriesmultiplier", 10);
101    conf.setFloat("replication.source.ratio", 1.0f);
102    conf.setBoolean("replication.source.eof.autorecovery", true);
103  }
104
105  protected static void startClusters() throws Exception {
106    ZK_UTIL.startMiniZKCluster();
107    initTestingUtility(UTIL1, "/cluster1");
108    initTestingUtility(UTIL2, "/cluster2");
109    StartTestingClusterOption option =
110      StartTestingClusterOption.builder().numMasters(2).numRegionServers(3).numDataNodes(3).build();
111    UTIL1.startMiniCluster(option);
112    UTIL2.startMiniCluster(option);
113    TableDescriptor td =
114      TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily(ColumnFamilyDescriptorBuilder
115        .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build();
116    UTIL1.getAdmin().createTable(td);
117    UTIL2.getAdmin().createTable(td);
118    FileSystem fs1 = UTIL1.getTestFileSystem();
119    FileSystem fs2 = UTIL2.getTestFileSystem();
120    REMOTE_WAL_DIR1 =
121      new Path(UTIL1.getMiniHBaseCluster().getMaster().getMasterFileSystem().getWALRootDir(),
122        "remoteWALs").makeQualified(fs1.getUri(), fs1.getWorkingDirectory());
123    REMOTE_WAL_DIR2 =
124      new Path(UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem().getWALRootDir(),
125        "remoteWALs").makeQualified(fs2.getUri(), fs2.getWorkingDirectory());
126    UTIL1.getAdmin().addReplicationPeer(PEER_ID,
127      ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getRpcConnnectionURI())
128        .setReplicateAllUserTables(false)
129        .setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>()))
130        .setRemoteWALDir(REMOTE_WAL_DIR2.toUri().toString()).build());
131    UTIL2.getAdmin().addReplicationPeer(PEER_ID,
132      ReplicationPeerConfig.newBuilder().setClusterKey(UTIL1.getRpcConnnectionURI())
133        .setReplicateAllUserTables(false)
134        .setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>()))
135        .setRemoteWALDir(REMOTE_WAL_DIR1.toUri().toString()).build());
136  }
137
138  private static void shutdown(HBaseTestingUtil util) throws Exception {
139    if (util.getHBaseCluster() == null) {
140      return;
141    }
142    Admin admin = util.getAdmin();
143    if (!admin.listReplicationPeers(Pattern.compile(PEER_ID)).isEmpty()) {
144      if (
145        admin.getReplicationPeerSyncReplicationState(PEER_ID)
146            != SyncReplicationState.DOWNGRADE_ACTIVE
147      ) {
148        admin.transitReplicationPeerSyncReplicationState(PEER_ID,
149          SyncReplicationState.DOWNGRADE_ACTIVE);
150      }
151      admin.removeReplicationPeer(PEER_ID);
152    }
153    util.shutdownMiniCluster();
154  }
155
156  @AfterAll
157  public static void tearDown() throws Exception {
158    shutdown(UTIL1);
159    shutdown(UTIL2);
160    ZK_UTIL.shutdownMiniZKCluster();
161  }
162
163  protected final void write(HBaseTestingUtil util, int start, int end) throws IOException {
164    try (Table table = util.getConnection().getTable(TABLE_NAME)) {
165      for (int i = start; i < end; i++) {
166        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
167      }
168    }
169  }
170
171  protected final void verify(HBaseTestingUtil util, int start, int end) throws IOException {
172    try (Table table = util.getConnection().getTable(TABLE_NAME)) {
173      for (int i = start; i < end; i++) {
174        assertEquals(i, Bytes.toInt(table.get(new Get(Bytes.toBytes(i))).getValue(CF, CQ)));
175      }
176    }
177  }
178
179  protected final void verifyThroughRegion(HBaseTestingUtil util, int start, int end)
180    throws IOException {
181    HRegion region = util.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
182    for (int i = start; i < end; i++) {
183      assertEquals(i, Bytes.toInt(region.get(new Get(Bytes.toBytes(i))).getValue(CF, CQ)));
184    }
185  }
186
187  protected final void verifyNotReplicatedThroughRegion(HBaseTestingUtil util, int start, int end)
188    throws IOException {
189    HRegion region = util.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
190    for (int i = start; i < end; i++) {
191      assertTrue(region.get(new Get(Bytes.toBytes(i))).isEmpty());
192    }
193  }
194
195  protected final void waitUntilReplicationDone(HBaseTestingUtil util, int end) throws Exception {
196    // The reject check is in RSRpcService so we can still read through HRegion
197    HRegion region = util.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
198    util.waitFor(30000, new ExplainingPredicate<Exception>() {
199
200      @Override
201      public boolean evaluate() throws Exception {
202        return !region.get(new Get(Bytes.toBytes(end - 1))).isEmpty();
203      }
204
205      @Override
206      public String explainFailure() throws Exception {
207        return "Replication has not been catched up yet";
208      }
209    });
210  }
211
212  protected final void writeAndVerifyReplication(HBaseTestingUtil util1, HBaseTestingUtil util2,
213    int start, int end) throws Exception {
214    write(util1, start, end);
215    waitUntilReplicationDone(util2, end);
216    verifyThroughRegion(util2, start, end);
217  }
218
219  protected final Path getRemoteWALDir(MasterFileSystem mfs, String peerId) {
220    Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME);
221    return getRemoteWALDir(remoteWALDir, peerId);
222  }
223
224  protected final Path getRemoteWALDir(Path remoteWALDir, String peerId) {
225    return new Path(remoteWALDir, peerId);
226  }
227
228  protected final Path getReplayRemoteWALs(Path remoteWALDir, String peerId) {
229    return new Path(remoteWALDir, peerId + "-replay");
230  }
231
232  protected final void verifyRemovedPeer(String peerId, Path remoteWALDir, HBaseTestingUtil utility)
233    throws Exception {
234    ReplicationPeerStorage rps = ReplicationStorageFactory.getReplicationPeerStorage(
235      utility.getTestFileSystem(), utility.getZooKeeperWatcher(), utility.getConfiguration());
236    try {
237      rps.getPeerSyncReplicationState(peerId);
238      fail("Should throw exception when get the sync replication state of a removed peer.");
239    } catch (ReplicationException e) {
240      // ignore.
241    }
242    try {
243      rps.getPeerNewSyncReplicationState(peerId);
244      fail("Should throw exception when get the new sync replication state of a removed peer");
245    } catch (ReplicationException e) {
246      // ignore.
247    }
248    try (FileSystem fs = utility.getTestFileSystem()) {
249      assertFalse(fs.exists(getRemoteWALDir(remoteWALDir, peerId)));
250      assertFalse(fs.exists(getReplayRemoteWALs(remoteWALDir, peerId)));
251    }
252  }
253
254  private void assertRejection(Throwable error) {
255    assertThat(error, instanceOf(DoNotRetryIOException.class));
256    assertTrue(error.getMessage().contains("Reject to apply to sink cluster"));
257    assertTrue(error.getMessage().contains(TABLE_NAME.toString()));
258  }
259
260  protected final void verifyReplicationRequestRejection(HBaseTestingUtil utility,
261    boolean expectedRejection) throws Exception {
262    HRegionServer regionServer = utility.getRSForFirstRegionInTable(TABLE_NAME);
263    AsyncClusterConnection connection = regionServer.getAsyncClusterConnection();
264    Entry[] entries = new Entry[10];
265    for (int i = 0; i < entries.length; i++) {
266      entries[i] =
267        new Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TABLE_NAME, 0), new WALEdit());
268    }
269    if (!expectedRejection) {
270      FutureUtils.get(ReplicationProtobufUtil.replicateWALEntry(
271        connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null,
272        HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT));
273    } else {
274      try {
275        FutureUtils.get(ReplicationProtobufUtil.replicateWALEntry(
276          connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null,
277          HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT));
278        fail("Should throw IOException when sync-replication state is in A or DA");
279      } catch (RemoteException e) {
280        assertRejection(e.unwrapRemoteException());
281      } catch (DoNotRetryIOException e) {
282        assertRejection(e);
283      }
284    }
285  }
286
287  protected final void waitUntilDeleted(HBaseTestingUtil util, Path remoteWAL) throws Exception {
288    MasterFileSystem mfs = util.getMiniHBaseCluster().getMaster().getMasterFileSystem();
289    util.waitFor(30000, new ExplainingPredicate<Exception>() {
290
291      @Override
292      public boolean evaluate() throws Exception {
293        return !mfs.getWALFileSystem().exists(remoteWAL);
294      }
295
296      @Override
297      public String explainFailure() throws Exception {
298        return remoteWAL + " has not been deleted yet";
299      }
300    });
301  }
302}