001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.AsyncConnectionConfiguration.START_LOG_ERRORS_AFTER_COUNT_KEY; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.IOException; 027import java.util.HashMap; 028import java.util.List; 029import java.util.Map; 030import java.util.concurrent.CompletionException; 031import java.util.concurrent.ForkJoinPool; 032import java.util.regex.Pattern; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseConfiguration; 036import org.apache.hadoop.hbase.HBaseTestingUtil; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.TableNotFoundException; 040import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 041import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; 042import org.apache.hadoop.hbase.testclassification.ClientTests; 043import org.apache.hadoop.hbase.testclassification.LargeTests; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.junit.After; 046import org.junit.AfterClass; 047import org.junit.BeforeClass; 048import org.junit.ClassRule; 049import org.junit.Test; 050import org.junit.experimental.categories.Category; 051import org.junit.runner.RunWith; 052import org.junit.runners.Parameterized; 053 054/** 055 * Class to test asynchronous replication admin operations when more than 1 cluster 056 */ 057@RunWith(Parameterized.class) 058@Category({ LargeTests.class, ClientTests.class }) 059public class TestAsyncReplicationAdminApiWithClusters extends TestAsyncAdminBase { 060 061 @ClassRule 062 public static final HBaseClassTestRule CLASS_RULE = 063 HBaseClassTestRule.forClass(TestAsyncReplicationAdminApiWithClusters.class); 064 065 private final static String ID_SECOND = "2"; 066 067 private static HBaseTestingUtil TEST_UTIL2; 068 private static Configuration conf2; 069 private static AsyncAdmin admin2; 070 private static AsyncConnection connection; 071 072 @BeforeClass 073 public static void setUpBeforeClass() throws Exception { 074 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000); 075 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000); 076 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 077 TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0); 078 TEST_UTIL.startMiniCluster(); 079 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 080 081 conf2 = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 082 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); 083 TEST_UTIL2 = new HBaseTestingUtil(conf2); 084 TEST_UTIL2.startMiniCluster(); 085 086 connection = ConnectionFactory.createAsyncConnection(TEST_UTIL2.getConfiguration()).get(); 087 admin2 = connection.getAdmin(); 088 089 ReplicationPeerConfig rpc = 090 ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL2.getRpcConnnectionURI()).build(); 091 ASYNC_CONN.getAdmin().addReplicationPeer(ID_SECOND, rpc).join(); 092 } 093 094 @AfterClass 095 public static void clearUp() throws IOException { 096 connection.close(); 097 } 098 099 @Override 100 @After 101 public void tearDown() throws Exception { 102 Pattern pattern = Pattern.compile(tableName.getNameAsString() + ".*"); 103 cleanupTables(admin, pattern); 104 cleanupTables(admin2, pattern); 105 } 106 107 private void cleanupTables(AsyncAdmin admin, Pattern pattern) { 108 admin.listTableNames(pattern, false).whenCompleteAsync((tables, err) -> { 109 if (tables != null) { 110 tables.forEach(table -> { 111 try { 112 admin.disableTable(table).join(); 113 } catch (Exception e) { 114 LOG.debug("Table: " + tableName + " already disabled, so just deleting it."); 115 } 116 admin.deleteTable(table).join(); 117 }); 118 } 119 }, ForkJoinPool.commonPool()).join(); 120 } 121 122 private void createTableWithDefaultConf(AsyncAdmin admin, TableName tableName) { 123 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 124 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)); 125 admin.createTable(builder.build()).join(); 126 } 127 128 @Test 129 public void testEnableAndDisableTableReplication() throws Exception { 130 // default replication scope is local 131 createTableWithDefaultConf(tableName); 132 admin.enableTableReplication(tableName).join(); 133 TableDescriptor tableDesc = admin.getDescriptor(tableName).get(); 134 for (ColumnFamilyDescriptor fam : tableDesc.getColumnFamilies()) { 135 assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope()); 136 } 137 138 admin.disableTableReplication(tableName).join(); 139 tableDesc = admin.getDescriptor(tableName).get(); 140 for (ColumnFamilyDescriptor fam : tableDesc.getColumnFamilies()) { 141 assertEquals(HConstants.REPLICATION_SCOPE_LOCAL, fam.getScope()); 142 } 143 } 144 145 @Test 146 public void testEnableReplicationWhenSlaveClusterDoesntHaveTable() throws Exception { 147 // Only create table in source cluster 148 createTableWithDefaultConf(tableName); 149 assertFalse(admin2.tableExists(tableName).get()); 150 admin.enableTableReplication(tableName).join(); 151 assertTrue(admin2.tableExists(tableName).get()); 152 } 153 154 @Test 155 public void testEnableReplicationWhenTableDescriptorIsNotSameInClusters() throws Exception { 156 createTableWithDefaultConf(admin, tableName); 157 createTableWithDefaultConf(admin2, tableName); 158 TableDescriptorBuilder builder = 159 TableDescriptorBuilder.newBuilder(admin.getDescriptor(tableName).get()); 160 builder.setColumnFamily( 161 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("newFamily")).build()); 162 admin2.disableTable(tableName).join(); 163 admin2.modifyTable(builder.build()).join(); 164 admin2.enableTable(tableName).join(); 165 166 try { 167 admin.enableTableReplication(tableName).join(); 168 fail("Exception should be thrown if table descriptors in the clusters are not same."); 169 } catch (Exception ignored) { 170 // ok 171 } 172 173 admin.disableTable(tableName).join(); 174 admin.modifyTable(builder.build()).join(); 175 admin.enableTable(tableName).join(); 176 admin.enableTableReplication(tableName).join(); 177 TableDescriptor tableDesc = admin.getDescriptor(tableName).get(); 178 for (ColumnFamilyDescriptor fam : tableDesc.getColumnFamilies()) { 179 assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope()); 180 } 181 } 182 183 @Test 184 public void testDisableReplicationForNonExistingTable() throws Exception { 185 try { 186 admin.disableTableReplication(tableName).join(); 187 } catch (CompletionException e) { 188 assertTrue(e.getCause() instanceof TableNotFoundException); 189 } 190 } 191 192 @Test 193 public void testEnableReplicationForNonExistingTable() throws Exception { 194 try { 195 admin.enableTableReplication(tableName).join(); 196 } catch (CompletionException e) { 197 assertTrue(e.getCause() instanceof TableNotFoundException); 198 } 199 } 200 201 @Test 202 public void testDisableReplicationWhenTableNameAsNull() throws Exception { 203 try { 204 admin.disableTableReplication(null).join(); 205 } catch (CompletionException e) { 206 assertTrue(e.getCause() instanceof IllegalArgumentException); 207 } 208 } 209 210 @Test 211 public void testEnableReplicationWhenTableNameAsNull() throws Exception { 212 try { 213 admin.enableTableReplication(null).join(); 214 } catch (CompletionException e) { 215 assertTrue(e.getCause() instanceof IllegalArgumentException); 216 } 217 } 218 219 /* 220 * Test enable table replication should create table only in user explicit specified table-cfs. 221 * HBASE-14717 222 */ 223 @Test 224 public void testEnableReplicationForExplicitSetTableCfs() throws Exception { 225 TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "2"); 226 // Only create table in source cluster 227 createTableWithDefaultConf(tableName); 228 createTableWithDefaultConf(tableName2); 229 assertFalse("Table should not exists in the peer cluster", admin2.tableExists(tableName).get()); 230 assertFalse("Table should not exists in the peer cluster", 231 admin2.tableExists(tableName2).get()); 232 233 Map<TableName, List<String>> tableCfs = new HashMap<>(); 234 tableCfs.put(tableName, null); 235 ReplicationPeerConfigBuilder rpcBuilder = 236 ReplicationPeerConfig.newBuilder(admin.getReplicationPeerConfig(ID_SECOND).get()) 237 .setReplicateAllUserTables(false).setTableCFsMap(tableCfs); 238 try { 239 // Only add tableName to replication peer config 240 admin.updateReplicationPeerConfig(ID_SECOND, rpcBuilder.build()).join(); 241 admin.enableTableReplication(tableName2).join(); 242 assertFalse("Table should not be created if user has set table cfs explicitly for the " 243 + "peer and this is not part of that collection", admin2.tableExists(tableName2).get()); 244 245 // Add tableName2 to replication peer config, too 246 tableCfs.put(tableName2, null); 247 rpcBuilder.setTableCFsMap(tableCfs); 248 admin.updateReplicationPeerConfig(ID_SECOND, rpcBuilder.build()).join(); 249 admin.enableTableReplication(tableName2).join(); 250 assertTrue( 251 "Table should be created if user has explicitly added table into table cfs collection", 252 admin2.tableExists(tableName2).get()); 253 } finally { 254 rpcBuilder.setTableCFsMap(null).setReplicateAllUserTables(true).build(); 255 admin.updateReplicationPeerConfig(ID_SECOND, rpcBuilder.build()).join(); 256 } 257 } 258}