001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.AsyncConnectionConfiguration.START_LOG_ERRORS_AFTER_COUNT_KEY; 021import static org.junit.jupiter.api.Assertions.assertEquals; 022import static org.junit.jupiter.api.Assertions.assertFalse; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024import static org.junit.jupiter.api.Assertions.fail; 025 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.concurrent.CompletionException; 030import java.util.concurrent.ForkJoinPool; 031import java.util.function.Supplier; 032import java.util.regex.Pattern; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.HBaseConfiguration; 035import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 036import org.apache.hadoop.hbase.HBaseTestingUtil; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.TableNotFoundException; 040import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 041import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; 042import org.apache.hadoop.hbase.testclassification.ClientTests; 043import org.apache.hadoop.hbase.testclassification.LargeTests; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.junit.jupiter.api.AfterAll; 046import org.junit.jupiter.api.AfterEach; 047import org.junit.jupiter.api.BeforeAll; 048import org.junit.jupiter.api.Tag; 049import org.junit.jupiter.api.TestTemplate; 050 051/** 052 * Class to test asynchronous replication admin operations when more than 1 cluster 053 */ 054@Tag(LargeTests.TAG) 055@Tag(ClientTests.TAG) 056@HBaseParameterizedTestTemplate(name = "{index}: policy = {0}") 057public class TestAsyncReplicationAdminApiWithClusters extends TestAsyncAdminBase { 058 059 private final static String ID_SECOND = "2"; 060 061 private static HBaseTestingUtil TEST_UTIL2; 062 private static Configuration conf2; 063 private static AsyncAdmin admin2; 064 private static AsyncConnection connection; 065 066 public TestAsyncReplicationAdminApiWithClusters(Supplier<AsyncAdmin> admin) { 067 super(admin); 068 } 069 070 @BeforeAll 071 public static void setUpBeforeClass() throws Exception { 072 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000); 073 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000); 074 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 075 TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0); 076 TEST_UTIL.startMiniCluster(); 077 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 078 079 conf2 = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 080 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); 081 TEST_UTIL2 = new HBaseTestingUtil(conf2); 082 TEST_UTIL2.startMiniCluster(); 083 084 connection = ConnectionFactory.createAsyncConnection(TEST_UTIL2.getConfiguration()).get(); 085 admin2 = connection.getAdmin(); 086 087 ReplicationPeerConfig rpc = 088 ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL2.getRpcConnnectionURI()).build(); 089 ASYNC_CONN.getAdmin().addReplicationPeer(ID_SECOND, rpc).join(); 090 } 091 092 @AfterAll 093 public static void clearUp() throws Exception { 094 TestAsyncAdminBase.tearDownAfterClass(); 095 connection.close(); 096 } 097 098 @Override 099 @AfterEach 100 public void tearDown() throws Exception { 101 Pattern pattern = Pattern.compile(tableName.getNameAsString() + ".*"); 102 cleanupTables(admin, pattern); 103 cleanupTables(admin2, pattern); 104 } 105 106 private void cleanupTables(AsyncAdmin admin, Pattern pattern) { 107 admin.listTableNames(pattern, false).whenCompleteAsync((tables, err) -> { 108 if (tables != null) { 109 tables.forEach(table -> { 110 try { 111 admin.disableTable(table).join(); 112 } catch (Exception e) { 113 LOG.debug("Table: " + tableName + " already disabled, so just deleting it."); 114 } 115 admin.deleteTable(table).join(); 116 }); 117 } 118 }, ForkJoinPool.commonPool()).join(); 119 } 120 121 private void createTableWithDefaultConf(AsyncAdmin admin, TableName tableName) { 122 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 123 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)); 124 admin.createTable(builder.build()).join(); 125 } 126 127 @TestTemplate 128 public void testEnableAndDisableTableReplication() throws Exception { 129 // default replication scope is local 130 createTableWithDefaultConf(tableName); 131 admin.enableTableReplication(tableName).join(); 132 TableDescriptor tableDesc = admin.getDescriptor(tableName).get(); 133 for (ColumnFamilyDescriptor fam : tableDesc.getColumnFamilies()) { 134 assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope()); 135 } 136 137 admin.disableTableReplication(tableName).join(); 138 tableDesc = admin.getDescriptor(tableName).get(); 139 for (ColumnFamilyDescriptor fam : tableDesc.getColumnFamilies()) { 140 assertEquals(HConstants.REPLICATION_SCOPE_LOCAL, fam.getScope()); 141 } 142 } 143 144 @TestTemplate 145 public void testEnableReplicationWhenSlaveClusterDoesntHaveTable() throws Exception { 146 // Only create table in source cluster 147 createTableWithDefaultConf(tableName); 148 assertFalse(admin2.tableExists(tableName).get()); 149 admin.enableTableReplication(tableName).join(); 150 assertTrue(admin2.tableExists(tableName).get()); 151 } 152 153 @TestTemplate 154 public void testEnableReplicationWhenTableDescriptorIsNotSameInClusters() throws Exception { 155 createTableWithDefaultConf(admin, tableName); 156 createTableWithDefaultConf(admin2, tableName); 157 TableDescriptorBuilder builder = 158 TableDescriptorBuilder.newBuilder(admin.getDescriptor(tableName).get()); 159 builder.setColumnFamily( 160 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("newFamily")).build()); 161 admin2.disableTable(tableName).join(); 162 admin2.modifyTable(builder.build()).join(); 163 admin2.enableTable(tableName).join(); 164 165 try { 166 admin.enableTableReplication(tableName).join(); 167 fail("Exception should be thrown if table descriptors in the clusters are not same."); 168 } catch (Exception ignored) { 169 // ok 170 } 171 172 admin.disableTable(tableName).join(); 173 admin.modifyTable(builder.build()).join(); 174 admin.enableTable(tableName).join(); 175 admin.enableTableReplication(tableName).join(); 176 TableDescriptor tableDesc = admin.getDescriptor(tableName).get(); 177 for (ColumnFamilyDescriptor fam : tableDesc.getColumnFamilies()) { 178 assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope()); 179 } 180 } 181 182 @TestTemplate 183 public void testDisableReplicationForNonExistingTable() throws Exception { 184 try { 185 admin.disableTableReplication(tableName).join(); 186 } catch (CompletionException e) { 187 assertTrue(e.getCause() instanceof TableNotFoundException); 188 } 189 } 190 191 @TestTemplate 192 public void testEnableReplicationForNonExistingTable() throws Exception { 193 try { 194 admin.enableTableReplication(tableName).join(); 195 } catch (CompletionException e) { 196 assertTrue(e.getCause() instanceof TableNotFoundException); 197 } 198 } 199 200 @TestTemplate 201 public void testDisableReplicationWhenTableNameAsNull() throws Exception { 202 try { 203 admin.disableTableReplication(null).join(); 204 } catch (CompletionException e) { 205 assertTrue(e.getCause() instanceof IllegalArgumentException); 206 } 207 } 208 209 @TestTemplate 210 public void testEnableReplicationWhenTableNameAsNull() throws Exception { 211 try { 212 admin.enableTableReplication(null).join(); 213 } catch (CompletionException e) { 214 assertTrue(e.getCause() instanceof IllegalArgumentException); 215 } 216 } 217 218 /* 219 * Test enable table replication should create table only in user explicit specified table-cfs. 220 * HBASE-14717 221 */ 222 @TestTemplate 223 public void testEnableReplicationForExplicitSetTableCfs() throws Exception { 224 TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "2"); 225 // Only create table in source cluster 226 createTableWithDefaultConf(tableName); 227 createTableWithDefaultConf(tableName2); 228 assertFalse(admin2.tableExists(tableName).get(), "Table should not exists in the peer cluster"); 229 assertFalse(admin2.tableExists(tableName2).get(), 230 "Table should not exists in the peer cluster"); 231 232 Map<TableName, List<String>> tableCfs = new HashMap<>(); 233 tableCfs.put(tableName, null); 234 ReplicationPeerConfigBuilder rpcBuilder = 235 ReplicationPeerConfig.newBuilder(admin.getReplicationPeerConfig(ID_SECOND).get()) 236 .setReplicateAllUserTables(false).setTableCFsMap(tableCfs); 237 try { 238 // Only add tableName to replication peer config 239 admin.updateReplicationPeerConfig(ID_SECOND, rpcBuilder.build()).join(); 240 admin.enableTableReplication(tableName2).join(); 241 assertFalse(admin2.tableExists(tableName2).get(), "Table should not be created if user " 242 + "has set table cfs explicitly for the peer and this is not part of that collection"); 243 244 // Add tableName2 to replication peer config, too 245 tableCfs.put(tableName2, null); 246 rpcBuilder.setTableCFsMap(tableCfs); 247 admin.updateReplicationPeerConfig(ID_SECOND, rpcBuilder.build()).join(); 248 admin.enableTableReplication(tableName2).join(); 249 assertTrue(admin2.tableExists(tableName2).get(), 250 "Table should be created if user has explicitly added table into table cfs collection"); 251 } finally { 252 rpcBuilder.setTableCFsMap(null).setReplicateAllUserTables(true).build(); 253 admin.updateReplicationPeerConfig(ID_SECOND, rpcBuilder.build()).join(); 254 } 255 } 256}