001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.hamcrest.CoreMatchers.instanceOf; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertFalse; 024import static org.junit.Assert.assertTrue; 025import static org.junit.Assert.fail; 026 027import java.io.IOException; 028import java.util.ArrayList; 029import java.util.regex.Pattern; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.DoNotRetryIOException; 034import org.apache.hadoop.hbase.HBaseTestingUtil; 035import org.apache.hadoop.hbase.HBaseZKTestingUtil; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.StartTestingClusterOption; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 040import org.apache.hadoop.hbase.client.Admin; 041import org.apache.hadoop.hbase.client.AsyncClusterConnection; 042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 043import org.apache.hadoop.hbase.client.Get; 044import org.apache.hadoop.hbase.client.Put; 045import org.apache.hadoop.hbase.client.Table; 046import org.apache.hadoop.hbase.client.TableDescriptor; 047import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 048import org.apache.hadoop.hbase.master.MasterFileSystem; 049import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil; 050import org.apache.hadoop.hbase.regionserver.HRegion; 051import org.apache.hadoop.hbase.regionserver.HRegionServer; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.util.FutureUtils; 054import org.apache.hadoop.hbase.wal.WAL.Entry; 055import org.apache.hadoop.hbase.wal.WALEdit; 056import org.apache.hadoop.hbase.wal.WALKeyImpl; 057import org.apache.hadoop.ipc.RemoteException; 058import org.junit.AfterClass; 059import org.junit.jupiter.api.AfterAll; 060 061import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 062 063/** 064 * Sync replication test base without BeforeAll method. 065 * @see SyncReplicationTestBase 066 */ 067public class SyncReplicationTestBaseNoBeforeAll { 068 069 protected static final HBaseZKTestingUtil ZK_UTIL = new HBaseZKTestingUtil(); 070 071 protected static final HBaseTestingUtil UTIL1 = new HBaseTestingUtil(); 072 073 protected static final HBaseTestingUtil UTIL2 = new HBaseTestingUtil(); 074 075 protected static TableName TABLE_NAME = TableName.valueOf("SyncRep"); 076 077 protected static byte[] CF = Bytes.toBytes("cf"); 078 079 protected static byte[] CQ = Bytes.toBytes("cq"); 080 081 protected static String PEER_ID = "1"; 082 083 protected static Path REMOTE_WAL_DIR1; 084 085 protected static Path REMOTE_WAL_DIR2; 086 087 protected static void initTestingUtility(HBaseTestingUtil util, String zkParent) { 088 util.setZkCluster(ZK_UTIL.getZkCluster()); 089 Configuration conf = util.getConfiguration(); 090 conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkParent); 091 conf.setInt("replication.source.size.capacity", 102400); 092 conf.setLong("replication.source.sleepforretries", 100); 093 conf.setInt("hbase.regionserver.maxlogs", 10); 094 conf.setLong("hbase.master.logcleaner.ttl", 10); 095 conf.setInt("zookeeper.recovery.retry", 1); 096 conf.setInt("zookeeper.recovery.retry.intervalmill", 10); 097 conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); 098 conf.setInt("replication.stats.thread.period.seconds", 5); 099 conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); 100 conf.setLong("replication.sleep.before.failover", 2000); 101 conf.setInt("replication.source.maxretriesmultiplier", 10); 102 conf.setFloat("replication.source.ratio", 1.0f); 103 conf.setBoolean("replication.source.eof.autorecovery", true); 104 } 105 106 protected static void startClusters() throws Exception { 107 ZK_UTIL.startMiniZKCluster(); 108 initTestingUtility(UTIL1, "/cluster1"); 109 initTestingUtility(UTIL2, "/cluster2"); 110 StartTestingClusterOption option = 111 StartTestingClusterOption.builder().numMasters(2).numRegionServers(3).numDataNodes(3).build(); 112 UTIL1.startMiniCluster(option); 113 UTIL2.startMiniCluster(option); 114 TableDescriptor td = 115 TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily(ColumnFamilyDescriptorBuilder 116 .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build(); 117 UTIL1.getAdmin().createTable(td); 118 UTIL2.getAdmin().createTable(td); 119 FileSystem fs1 = UTIL1.getTestFileSystem(); 120 FileSystem fs2 = UTIL2.getTestFileSystem(); 121 REMOTE_WAL_DIR1 = 122 new Path(UTIL1.getMiniHBaseCluster().getMaster().getMasterFileSystem().getWALRootDir(), 123 "remoteWALs").makeQualified(fs1.getUri(), fs1.getWorkingDirectory()); 124 REMOTE_WAL_DIR2 = 125 new Path(UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem().getWALRootDir(), 126 "remoteWALs").makeQualified(fs2.getUri(), fs2.getWorkingDirectory()); 127 UTIL1.getAdmin().addReplicationPeer(PEER_ID, 128 ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getRpcConnnectionURI()) 129 .setReplicateAllUserTables(false) 130 .setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>())) 131 .setRemoteWALDir(REMOTE_WAL_DIR2.toUri().toString()).build()); 132 UTIL2.getAdmin().addReplicationPeer(PEER_ID, 133 ReplicationPeerConfig.newBuilder().setClusterKey(UTIL1.getRpcConnnectionURI()) 134 .setReplicateAllUserTables(false) 135 .setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>())) 136 .setRemoteWALDir(REMOTE_WAL_DIR1.toUri().toString()).build()); 137 } 138 139 private static void shutdown(HBaseTestingUtil util) throws Exception { 140 if (util.getHBaseCluster() == null) { 141 return; 142 } 143 Admin admin = util.getAdmin(); 144 if (!admin.listReplicationPeers(Pattern.compile(PEER_ID)).isEmpty()) { 145 if ( 146 admin.getReplicationPeerSyncReplicationState(PEER_ID) 147 != SyncReplicationState.DOWNGRADE_ACTIVE 148 ) { 149 admin.transitReplicationPeerSyncReplicationState(PEER_ID, 150 SyncReplicationState.DOWNGRADE_ACTIVE); 151 } 152 admin.removeReplicationPeer(PEER_ID); 153 } 154 util.shutdownMiniCluster(); 155 } 156 157 @AfterAll 158 @AfterClass 159 public static void tearDown() throws Exception { 160 shutdown(UTIL1); 161 shutdown(UTIL2); 162 ZK_UTIL.shutdownMiniZKCluster(); 163 } 164 165 protected final void write(HBaseTestingUtil util, int start, int end) throws IOException { 166 try (Table table = util.getConnection().getTable(TABLE_NAME)) { 167 for (int i = start; i < end; i++) { 168 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 169 } 170 } 171 } 172 173 protected final void verify(HBaseTestingUtil util, int start, int end) throws IOException { 174 try (Table table = util.getConnection().getTable(TABLE_NAME)) { 175 for (int i = start; i < end; i++) { 176 assertEquals(i, Bytes.toInt(table.get(new Get(Bytes.toBytes(i))).getValue(CF, CQ))); 177 } 178 } 179 } 180 181 protected final void verifyThroughRegion(HBaseTestingUtil util, int start, int end) 182 throws IOException { 183 HRegion region = util.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0); 184 for (int i = start; i < end; i++) { 185 assertEquals(i, Bytes.toInt(region.get(new Get(Bytes.toBytes(i))).getValue(CF, CQ))); 186 } 187 } 188 189 protected final void verifyNotReplicatedThroughRegion(HBaseTestingUtil util, int start, int end) 190 throws IOException { 191 HRegion region = util.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0); 192 for (int i = start; i < end; i++) { 193 assertTrue(region.get(new Get(Bytes.toBytes(i))).isEmpty()); 194 } 195 } 196 197 protected final void waitUntilReplicationDone(HBaseTestingUtil util, int end) throws Exception { 198 // The reject check is in RSRpcService so we can still read through HRegion 199 HRegion region = util.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0); 200 util.waitFor(30000, new ExplainingPredicate<Exception>() { 201 202 @Override 203 public boolean evaluate() throws Exception { 204 return !region.get(new Get(Bytes.toBytes(end - 1))).isEmpty(); 205 } 206 207 @Override 208 public String explainFailure() throws Exception { 209 return "Replication has not been catched up yet"; 210 } 211 }); 212 } 213 214 protected final void writeAndVerifyReplication(HBaseTestingUtil util1, HBaseTestingUtil util2, 215 int start, int end) throws Exception { 216 write(util1, start, end); 217 waitUntilReplicationDone(util2, end); 218 verifyThroughRegion(util2, start, end); 219 } 220 221 protected final Path getRemoteWALDir(MasterFileSystem mfs, String peerId) { 222 Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME); 223 return getRemoteWALDir(remoteWALDir, peerId); 224 } 225 226 protected final Path getRemoteWALDir(Path remoteWALDir, String peerId) { 227 return new Path(remoteWALDir, peerId); 228 } 229 230 protected final Path getReplayRemoteWALs(Path remoteWALDir, String peerId) { 231 return new Path(remoteWALDir, peerId + "-replay"); 232 } 233 234 protected final void verifyRemovedPeer(String peerId, Path remoteWALDir, HBaseTestingUtil utility) 235 throws Exception { 236 ReplicationPeerStorage rps = ReplicationStorageFactory.getReplicationPeerStorage( 237 utility.getTestFileSystem(), utility.getZooKeeperWatcher(), utility.getConfiguration()); 238 try { 239 rps.getPeerSyncReplicationState(peerId); 240 fail("Should throw exception when get the sync replication state of a removed peer."); 241 } catch (ReplicationException e) { 242 // ignore. 243 } 244 try { 245 rps.getPeerNewSyncReplicationState(peerId); 246 fail("Should throw exception when get the new sync replication state of a removed peer"); 247 } catch (ReplicationException e) { 248 // ignore. 249 } 250 try (FileSystem fs = utility.getTestFileSystem()) { 251 assertFalse(fs.exists(getRemoteWALDir(remoteWALDir, peerId))); 252 assertFalse(fs.exists(getReplayRemoteWALs(remoteWALDir, peerId))); 253 } 254 } 255 256 private void assertRejection(Throwable error) { 257 assertThat(error, instanceOf(DoNotRetryIOException.class)); 258 assertTrue(error.getMessage().contains("Reject to apply to sink cluster")); 259 assertTrue(error.getMessage().contains(TABLE_NAME.toString())); 260 } 261 262 protected final void verifyReplicationRequestRejection(HBaseTestingUtil utility, 263 boolean expectedRejection) throws Exception { 264 HRegionServer regionServer = utility.getRSForFirstRegionInTable(TABLE_NAME); 265 AsyncClusterConnection connection = regionServer.getAsyncClusterConnection(); 266 Entry[] entries = new Entry[10]; 267 for (int i = 0; i < entries.length; i++) { 268 entries[i] = 269 new Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TABLE_NAME, 0), new WALEdit()); 270 } 271 if (!expectedRejection) { 272 FutureUtils.get(ReplicationProtobufUtil.replicateWALEntry( 273 connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null, 274 HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT)); 275 } else { 276 try { 277 FutureUtils.get(ReplicationProtobufUtil.replicateWALEntry( 278 connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null, 279 HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT)); 280 fail("Should throw IOException when sync-replication state is in A or DA"); 281 } catch (RemoteException e) { 282 assertRejection(e.unwrapRemoteException()); 283 } catch (DoNotRetryIOException e) { 284 assertRejection(e); 285 } 286 } 287 } 288 289 protected final void waitUntilDeleted(HBaseTestingUtil util, Path remoteWAL) throws Exception { 290 MasterFileSystem mfs = util.getMiniHBaseCluster().getMaster().getMasterFileSystem(); 291 util.waitFor(30000, new ExplainingPredicate<Exception>() { 292 293 @Override 294 public boolean evaluate() throws Exception { 295 return !mfs.getWALFileSystem().exists(remoteWAL); 296 } 297 298 @Override 299 public String explainFailure() throws Exception { 300 return remoteWAL + " has not been deleted yet"; 301 } 302 }); 303 } 304}