001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.util.Collection; 027import java.util.HashMap; 028import java.util.Map; 029import java.util.concurrent.CompletionException; 030import java.util.concurrent.ForkJoinPool; 031import java.util.regex.Pattern; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.HBaseClassTestRule; 034import org.apache.hadoop.hbase.HBaseConfiguration; 035import org.apache.hadoop.hbase.HBaseTestingUtility; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.TableNotFoundException; 039import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 040import org.apache.hadoop.hbase.testclassification.ClientTests; 041import org.apache.hadoop.hbase.testclassification.LargeTests; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.junit.After; 044import org.junit.BeforeClass; 045import org.junit.ClassRule; 046import org.junit.Test; 047import org.junit.experimental.categories.Category; 048import org.junit.runner.RunWith; 049import org.junit.runners.Parameterized; 050 051/** 052 * Class to test asynchronous replication admin operations when more than 1 cluster 053 */ 054@RunWith(Parameterized.class) 055@Category({LargeTests.class, ClientTests.class}) 056public class TestAsyncReplicationAdminApiWithClusters extends TestAsyncAdminBase { 057 058 @ClassRule 059 public static final HBaseClassTestRule CLASS_RULE = 060 HBaseClassTestRule.forClass(TestAsyncReplicationAdminApiWithClusters.class); 061 062 private final static String ID_SECOND = "2"; 063 064 private static HBaseTestingUtility TEST_UTIL2; 065 private static Configuration conf2; 066 private static AsyncAdmin admin2; 067 068 @BeforeClass 069 public static void setUpBeforeClass() throws Exception { 070 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000); 071 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000); 072 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 073 TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0); 074 TEST_UTIL.startMiniCluster(); 075 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 076 077 conf2 = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 078 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); 079 TEST_UTIL2 = new HBaseTestingUtility(conf2); 080 TEST_UTIL2.startMiniCluster(); 081 admin2 = 082 ConnectionFactory.createAsyncConnection(TEST_UTIL2.getConfiguration()).get().getAdmin(); 083 084 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 085 rpc.setClusterKey(TEST_UTIL2.getClusterKey()); 086 ASYNC_CONN.getAdmin().addReplicationPeer(ID_SECOND, rpc).join(); 087 } 088 089 @Override 090 @After 091 public void tearDown() throws Exception { 092 Pattern pattern = Pattern.compile(tableName.getNameAsString() + ".*"); 093 cleanupTables(admin, pattern); 094 cleanupTables(admin2, pattern); 095 } 096 097 private void cleanupTables(AsyncAdmin admin, Pattern pattern) { 098 admin.listTableNames(pattern, false).whenCompleteAsync((tables, err) -> { 099 if (tables != null) { 100 tables.forEach(table -> { 101 try { 102 admin.disableTable(table).join(); 103 } catch (Exception e) { 104 LOG.debug("Table: " + tableName + " already disabled, so just deleting it."); 105 } 106 admin.deleteTable(table).join(); 107 }); 108 } 109 }, ForkJoinPool.commonPool()).join(); 110 } 111 112 private void createTableWithDefaultConf(AsyncAdmin admin, TableName tableName) { 113 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 114 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)); 115 admin.createTable(builder.build()).join(); 116 } 117 118 @Test 119 public void testEnableAndDisableTableReplication() throws Exception { 120 // default replication scope is local 121 createTableWithDefaultConf(tableName); 122 admin.enableTableReplication(tableName).join(); 123 TableDescriptor tableDesc = admin.getDescriptor(tableName).get(); 124 for (ColumnFamilyDescriptor fam : tableDesc.getColumnFamilies()) { 125 assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope()); 126 } 127 128 admin.disableTableReplication(tableName).join(); 129 tableDesc = admin.getDescriptor(tableName).get(); 130 for (ColumnFamilyDescriptor fam : tableDesc.getColumnFamilies()) { 131 assertEquals(HConstants.REPLICATION_SCOPE_LOCAL, fam.getScope()); 132 } 133 } 134 135 @Test 136 public void testEnableReplicationWhenSlaveClusterDoesntHaveTable() throws Exception { 137 // Only create table in source cluster 138 createTableWithDefaultConf(tableName); 139 assertFalse(admin2.tableExists(tableName).get()); 140 admin.enableTableReplication(tableName).join(); 141 assertTrue(admin2.tableExists(tableName).get()); 142 } 143 144 @Test 145 public void testEnableReplicationWhenTableDescriptorIsNotSameInClusters() throws Exception { 146 createTableWithDefaultConf(admin, tableName); 147 createTableWithDefaultConf(admin2, tableName); 148 TableDescriptorBuilder builder = 149 TableDescriptorBuilder.newBuilder(admin.getDescriptor(tableName).get()); 150 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("newFamily")) 151 .build()); 152 admin2.disableTable(tableName).join(); 153 admin2.modifyTable(builder.build()).join(); 154 admin2.enableTable(tableName).join(); 155 156 try { 157 admin.enableTableReplication(tableName).join(); 158 fail("Exception should be thrown if table descriptors in the clusters are not same."); 159 } catch (Exception ignored) { 160 // ok 161 } 162 163 admin.disableTable(tableName).join(); 164 admin.modifyTable(builder.build()).join(); 165 admin.enableTable(tableName).join(); 166 admin.enableTableReplication(tableName).join(); 167 TableDescriptor tableDesc = admin.getDescriptor(tableName).get(); 168 for (ColumnFamilyDescriptor fam : tableDesc.getColumnFamilies()) { 169 assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope()); 170 } 171 } 172 173 @Test 174 public void testDisableReplicationForNonExistingTable() throws Exception { 175 try { 176 admin.disableTableReplication(tableName).join(); 177 } catch (CompletionException e) { 178 assertTrue(e.getCause() instanceof TableNotFoundException); 179 } 180 } 181 182 @Test 183 public void testEnableReplicationForNonExistingTable() throws Exception { 184 try { 185 admin.enableTableReplication(tableName).join(); 186 } catch (CompletionException e) { 187 assertTrue(e.getCause() instanceof TableNotFoundException); 188 } 189 } 190 191 @Test 192 public void testDisableReplicationWhenTableNameAsNull() throws Exception { 193 try { 194 admin.disableTableReplication(null).join(); 195 } catch (CompletionException e) { 196 assertTrue(e.getCause() instanceof IllegalArgumentException); 197 } 198 } 199 200 @Test 201 public void testEnableReplicationWhenTableNameAsNull() throws Exception { 202 try { 203 admin.enableTableReplication(null).join(); 204 } catch (CompletionException e) { 205 assertTrue(e.getCause() instanceof IllegalArgumentException); 206 } 207 } 208 209 /* 210 * Test enable table replication should create table only in user explicit specified table-cfs. 211 * HBASE-14717 212 */ 213 @Test 214 public void testEnableReplicationForExplicitSetTableCfs() throws Exception { 215 TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "2"); 216 // Only create table in source cluster 217 createTableWithDefaultConf(tableName); 218 createTableWithDefaultConf(tableName2); 219 assertFalse("Table should not exists in the peer cluster", 220 admin2.tableExists(tableName).get()); 221 assertFalse("Table should not exists in the peer cluster", 222 admin2.tableExists(tableName2).get()); 223 224 Map<TableName, ? extends Collection<String>> tableCfs = new HashMap<>(); 225 tableCfs.put(tableName, null); 226 ReplicationPeerConfig rpc = admin.getReplicationPeerConfig(ID_SECOND).get(); 227 rpc.setReplicateAllUserTables(false); 228 rpc.setTableCFsMap(tableCfs); 229 try { 230 // Only add tableName to replication peer config 231 admin.updateReplicationPeerConfig(ID_SECOND, rpc).join(); 232 admin.enableTableReplication(tableName2).join(); 233 assertFalse("Table should not be created if user has set table cfs explicitly for the " 234 + "peer and this is not part of that collection", admin2.tableExists(tableName2).get()); 235 236 // Add tableName2 to replication peer config, too 237 tableCfs.put(tableName2, null); 238 rpc.setTableCFsMap(tableCfs); 239 admin.updateReplicationPeerConfig(ID_SECOND, rpc).join(); 240 admin.enableTableReplication(tableName2).join(); 241 assertTrue( 242 "Table should be created if user has explicitly added table into table cfs collection", 243 admin2.tableExists(tableName2).get()); 244 } finally { 245 rpc.setTableCFsMap(null); 246 rpc.setReplicateAllUserTables(true); 247 admin.updateReplicationPeerConfig(ID_SECOND, rpc).join(); 248 } 249 } 250}