001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.hamcrest.CoreMatchers.instanceOf;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertFalse;
024import static org.junit.Assert.assertTrue;
025import static org.junit.Assert.fail;
026
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.regex.Pattern;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.DoNotRetryIOException;
034import org.apache.hadoop.hbase.HBaseTestingUtil;
035import org.apache.hadoop.hbase.HBaseZKTestingUtil;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.StartTestingClusterOption;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
040import org.apache.hadoop.hbase.client.Admin;
041import org.apache.hadoop.hbase.client.AsyncClusterConnection;
042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
043import org.apache.hadoop.hbase.client.Get;
044import org.apache.hadoop.hbase.client.Put;
045import org.apache.hadoop.hbase.client.Table;
046import org.apache.hadoop.hbase.client.TableDescriptor;
047import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
048import org.apache.hadoop.hbase.master.MasterFileSystem;
049import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
050import org.apache.hadoop.hbase.regionserver.HRegion;
051import org.apache.hadoop.hbase.regionserver.HRegionServer;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.util.FutureUtils;
054import org.apache.hadoop.hbase.wal.WAL.Entry;
055import org.apache.hadoop.hbase.wal.WALEdit;
056import org.apache.hadoop.hbase.wal.WALKeyImpl;
057import org.apache.hadoop.ipc.RemoteException;
058import org.junit.AfterClass;
059import org.junit.jupiter.api.AfterAll;
060
061import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
062
063/**
064 * Sync replication test base without BeforeAll method.
065 * @see SyncReplicationTestBase
066 */
067public class SyncReplicationTestBaseNoBeforeAll {
068
069  protected static final HBaseZKTestingUtil ZK_UTIL = new HBaseZKTestingUtil();
070
071  protected static final HBaseTestingUtil UTIL1 = new HBaseTestingUtil();
072
073  protected static final HBaseTestingUtil UTIL2 = new HBaseTestingUtil();
074
075  protected static TableName TABLE_NAME = TableName.valueOf("SyncRep");
076
077  protected static byte[] CF = Bytes.toBytes("cf");
078
079  protected static byte[] CQ = Bytes.toBytes("cq");
080
081  protected static String PEER_ID = "1";
082
083  protected static Path REMOTE_WAL_DIR1;
084
085  protected static Path REMOTE_WAL_DIR2;
086
087  protected static void initTestingUtility(HBaseTestingUtil util, String zkParent) {
088    util.setZkCluster(ZK_UTIL.getZkCluster());
089    Configuration conf = util.getConfiguration();
090    conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkParent);
091    conf.setInt("replication.source.size.capacity", 102400);
092    conf.setLong("replication.source.sleepforretries", 100);
093    conf.setInt("hbase.regionserver.maxlogs", 10);
094    conf.setLong("hbase.master.logcleaner.ttl", 10);
095    conf.setInt("zookeeper.recovery.retry", 1);
096    conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
097    conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
098    conf.setInt("replication.stats.thread.period.seconds", 5);
099    conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
100    conf.setLong("replication.sleep.before.failover", 2000);
101    conf.setInt("replication.source.maxretriesmultiplier", 10);
102    conf.setFloat("replication.source.ratio", 1.0f);
103    conf.setBoolean("replication.source.eof.autorecovery", true);
104  }
105
106  protected static void startClusters() throws Exception {
107    ZK_UTIL.startMiniZKCluster();
108    initTestingUtility(UTIL1, "/cluster1");
109    initTestingUtility(UTIL2, "/cluster2");
110    StartTestingClusterOption option =
111      StartTestingClusterOption.builder().numMasters(2).numRegionServers(3).numDataNodes(3).build();
112    UTIL1.startMiniCluster(option);
113    UTIL2.startMiniCluster(option);
114    TableDescriptor td =
115      TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily(ColumnFamilyDescriptorBuilder
116        .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build();
117    UTIL1.getAdmin().createTable(td);
118    UTIL2.getAdmin().createTable(td);
119    FileSystem fs1 = UTIL1.getTestFileSystem();
120    FileSystem fs2 = UTIL2.getTestFileSystem();
121    REMOTE_WAL_DIR1 =
122      new Path(UTIL1.getMiniHBaseCluster().getMaster().getMasterFileSystem().getWALRootDir(),
123        "remoteWALs").makeQualified(fs1.getUri(), fs1.getWorkingDirectory());
124    REMOTE_WAL_DIR2 =
125      new Path(UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem().getWALRootDir(),
126        "remoteWALs").makeQualified(fs2.getUri(), fs2.getWorkingDirectory());
127    UTIL1.getAdmin().addReplicationPeer(PEER_ID,
128      ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getRpcConnnectionURI())
129        .setReplicateAllUserTables(false)
130        .setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>()))
131        .setRemoteWALDir(REMOTE_WAL_DIR2.toUri().toString()).build());
132    UTIL2.getAdmin().addReplicationPeer(PEER_ID,
133      ReplicationPeerConfig.newBuilder().setClusterKey(UTIL1.getRpcConnnectionURI())
134        .setReplicateAllUserTables(false)
135        .setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>()))
136        .setRemoteWALDir(REMOTE_WAL_DIR1.toUri().toString()).build());
137  }
138
139  private static void shutdown(HBaseTestingUtil util) throws Exception {
140    if (util.getHBaseCluster() == null) {
141      return;
142    }
143    Admin admin = util.getAdmin();
144    if (!admin.listReplicationPeers(Pattern.compile(PEER_ID)).isEmpty()) {
145      if (
146        admin.getReplicationPeerSyncReplicationState(PEER_ID)
147            != SyncReplicationState.DOWNGRADE_ACTIVE
148      ) {
149        admin.transitReplicationPeerSyncReplicationState(PEER_ID,
150          SyncReplicationState.DOWNGRADE_ACTIVE);
151      }
152      admin.removeReplicationPeer(PEER_ID);
153    }
154    util.shutdownMiniCluster();
155  }
156
157  @AfterAll
158  @AfterClass
159  public static void tearDown() throws Exception {
160    shutdown(UTIL1);
161    shutdown(UTIL2);
162    ZK_UTIL.shutdownMiniZKCluster();
163  }
164
165  protected final void write(HBaseTestingUtil util, int start, int end) throws IOException {
166    try (Table table = util.getConnection().getTable(TABLE_NAME)) {
167      for (int i = start; i < end; i++) {
168        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
169      }
170    }
171  }
172
173  protected final void verify(HBaseTestingUtil util, int start, int end) throws IOException {
174    try (Table table = util.getConnection().getTable(TABLE_NAME)) {
175      for (int i = start; i < end; i++) {
176        assertEquals(i, Bytes.toInt(table.get(new Get(Bytes.toBytes(i))).getValue(CF, CQ)));
177      }
178    }
179  }
180
181  protected final void verifyThroughRegion(HBaseTestingUtil util, int start, int end)
182    throws IOException {
183    HRegion region = util.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
184    for (int i = start; i < end; i++) {
185      assertEquals(i, Bytes.toInt(region.get(new Get(Bytes.toBytes(i))).getValue(CF, CQ)));
186    }
187  }
188
189  protected final void verifyNotReplicatedThroughRegion(HBaseTestingUtil util, int start, int end)
190    throws IOException {
191    HRegion region = util.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
192    for (int i = start; i < end; i++) {
193      assertTrue(region.get(new Get(Bytes.toBytes(i))).isEmpty());
194    }
195  }
196
197  protected final void waitUntilReplicationDone(HBaseTestingUtil util, int end) throws Exception {
198    // The reject check is in RSRpcService so we can still read through HRegion
199    HRegion region = util.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
200    util.waitFor(30000, new ExplainingPredicate<Exception>() {
201
202      @Override
203      public boolean evaluate() throws Exception {
204        return !region.get(new Get(Bytes.toBytes(end - 1))).isEmpty();
205      }
206
207      @Override
208      public String explainFailure() throws Exception {
209        return "Replication has not been catched up yet";
210      }
211    });
212  }
213
214  protected final void writeAndVerifyReplication(HBaseTestingUtil util1, HBaseTestingUtil util2,
215    int start, int end) throws Exception {
216    write(util1, start, end);
217    waitUntilReplicationDone(util2, end);
218    verifyThroughRegion(util2, start, end);
219  }
220
221  protected final Path getRemoteWALDir(MasterFileSystem mfs, String peerId) {
222    Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME);
223    return getRemoteWALDir(remoteWALDir, peerId);
224  }
225
226  protected final Path getRemoteWALDir(Path remoteWALDir, String peerId) {
227    return new Path(remoteWALDir, peerId);
228  }
229
230  protected final Path getReplayRemoteWALs(Path remoteWALDir, String peerId) {
231    return new Path(remoteWALDir, peerId + "-replay");
232  }
233
234  protected final void verifyRemovedPeer(String peerId, Path remoteWALDir, HBaseTestingUtil utility)
235    throws Exception {
236    ReplicationPeerStorage rps = ReplicationStorageFactory.getReplicationPeerStorage(
237      utility.getTestFileSystem(), utility.getZooKeeperWatcher(), utility.getConfiguration());
238    try {
239      rps.getPeerSyncReplicationState(peerId);
240      fail("Should throw exception when get the sync replication state of a removed peer.");
241    } catch (ReplicationException e) {
242      // ignore.
243    }
244    try {
245      rps.getPeerNewSyncReplicationState(peerId);
246      fail("Should throw exception when get the new sync replication state of a removed peer");
247    } catch (ReplicationException e) {
248      // ignore.
249    }
250    try (FileSystem fs = utility.getTestFileSystem()) {
251      assertFalse(fs.exists(getRemoteWALDir(remoteWALDir, peerId)));
252      assertFalse(fs.exists(getReplayRemoteWALs(remoteWALDir, peerId)));
253    }
254  }
255
256  private void assertRejection(Throwable error) {
257    assertThat(error, instanceOf(DoNotRetryIOException.class));
258    assertTrue(error.getMessage().contains("Reject to apply to sink cluster"));
259    assertTrue(error.getMessage().contains(TABLE_NAME.toString()));
260  }
261
262  protected final void verifyReplicationRequestRejection(HBaseTestingUtil utility,
263    boolean expectedRejection) throws Exception {
264    HRegionServer regionServer = utility.getRSForFirstRegionInTable(TABLE_NAME);
265    AsyncClusterConnection connection = regionServer.getAsyncClusterConnection();
266    Entry[] entries = new Entry[10];
267    for (int i = 0; i < entries.length; i++) {
268      entries[i] =
269        new Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TABLE_NAME, 0), new WALEdit());
270    }
271    if (!expectedRejection) {
272      FutureUtils.get(ReplicationProtobufUtil.replicateWALEntry(
273        connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null,
274        HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT));
275    } else {
276      try {
277        FutureUtils.get(ReplicationProtobufUtil.replicateWALEntry(
278          connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null,
279          HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT));
280        fail("Should throw IOException when sync-replication state is in A or DA");
281      } catch (RemoteException e) {
282        assertRejection(e.unwrapRemoteException());
283      } catch (DoNotRetryIOException e) {
284        assertRejection(e);
285      }
286    }
287  }
288
289  protected final void waitUntilDeleted(HBaseTestingUtil util, Path remoteWAL) throws Exception {
290    MasterFileSystem mfs = util.getMiniHBaseCluster().getMaster().getMasterFileSystem();
291    util.waitFor(30000, new ExplainingPredicate<Exception>() {
292
293      @Override
294      public boolean evaluate() throws Exception {
295        return !mfs.getWALFileSystem().exists(remoteWAL);
296      }
297
298      @Override
299      public String explainFailure() throws Exception {
300        return remoteWAL + " has not been deleted yet";
301      }
302    });
303  }
304}