001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client.replication; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.HBaseTestingUtil; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.client.Admin; 032import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 033import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; 034import org.apache.hadoop.hbase.testclassification.ClientTests; 035import org.apache.hadoop.hbase.testclassification.MediumTests; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.junit.jupiter.api.AfterAll; 038import org.junit.jupiter.api.BeforeAll; 039import org.junit.jupiter.api.Tag; 040import org.junit.jupiter.api.Test; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044@Tag(MediumTests.TAG) 045@Tag(ClientTests.TAG) 046public class TestReplicationAdminForSyncReplication { 047 048 private static final Logger LOG = 049 LoggerFactory.getLogger(TestReplicationAdminForSyncReplication.class); 050 051 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 052 053 private static Admin hbaseAdmin; 054 055 @BeforeAll 056 public static void setUpBeforeClass() throws Exception { 057 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 058 TEST_UTIL.startMiniCluster(); 059 hbaseAdmin = TEST_UTIL.getAdmin(); 060 } 061 062 @AfterAll 063 public static void tearDownAfterClass() throws Exception { 064 hbaseAdmin.close(); 065 TEST_UTIL.shutdownMiniCluster(); 066 } 067 068 @Test 069 public void testAddPeerWithSameTable() throws Exception { 070 TableName tableName = TableName.valueOf("testAddPeerWithSameTable"); 071 TEST_UTIL.createTable(tableName, Bytes.toBytes("family")); 072 073 boolean[] success = { true, true, true, true, true, true }; 074 Thread[] threads = new Thread[5]; 075 for (int i = 0; i < 5; i++) { 076 String peerId = "id" + i; 077 String clusterKey = TEST_UTIL.getZkConnectionURI() + "-test" + i; 078 int index = i; 079 threads[i] = new Thread(() -> { 080 try { 081 hbaseAdmin.addReplicationPeer(peerId, 082 buildSyncReplicationPeerConfig(clusterKey, tableName)); 083 } catch (IOException e) { 084 LOG.error("Failed to add replication peer " + peerId); 085 success[index] = false; 086 } 087 }); 088 } 089 for (int i = 0; i < 5; i++) { 090 threads[i].start(); 091 } 092 for (int i = 0; i < 5; i++) { 093 threads[i].join(); 094 } 095 096 int successCount = 0; 097 for (int i = 0; i < 5; i++) { 098 if (success[i]) { 099 successCount++; 100 } 101 } 102 assertEquals(1, successCount, "Only one peer can be added successfully"); 103 } 104 105 private ReplicationPeerConfig buildSyncReplicationPeerConfig(String clusterKey, 106 TableName tableName) throws IOException { 107 Path rootDir = TEST_UTIL.getDataTestDirOnTestFS("remoteWAL"); 108 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); 109 builder.setClusterKey(clusterKey); 110 builder.setRemoteWALDir(rootDir.makeQualified(TEST_UTIL.getTestFileSystem().getUri(), 111 TEST_UTIL.getTestFileSystem().getWorkingDirectory()).toString()); 112 builder.setReplicateAllUserTables(false); 113 Map<TableName, List<String>> tableCfs = new HashMap<>(); 114 tableCfs.put(tableName, new ArrayList<>()); 115 builder.setTableCFsMap(tableCfs); 116 return builder.build(); 117 } 118}