001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client.replication; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024 025import java.util.Collection; 026import java.util.HashMap; 027import java.util.Map; 028import java.util.UUID; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HColumnDescriptor; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.HTableDescriptor; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.TableNotFoundException; 035import org.apache.hadoop.hbase.client.Admin; 036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 038import org.apache.hadoop.hbase.client.Connection; 039import org.apache.hadoop.hbase.client.ConnectionFactory; 040import org.apache.hadoop.hbase.client.TableDescriptor; 041import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 042import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; 043import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 044import org.apache.hadoop.hbase.replication.TestReplicationBase; 045import org.apache.hadoop.hbase.testclassification.ClientTests; 046import org.apache.hadoop.hbase.testclassification.MediumTests; 047import org.junit.AfterClass; 048import org.junit.BeforeClass; 049import org.junit.ClassRule; 050import org.junit.Rule; 051import org.junit.Test; 052import org.junit.experimental.categories.Category; 053import org.junit.rules.TestName; 054 055/** 056 * Unit testing of ReplicationAdmin with clusters 057 */ 058@Category({ MediumTests.class, ClientTests.class }) 059public class TestReplicationAdminWithClusters extends TestReplicationBase { 060 061 @ClassRule 062 public static final HBaseClassTestRule CLASS_RULE = 063 HBaseClassTestRule.forClass(TestReplicationAdminWithClusters.class); 064 065 static Connection connection1; 066 static Connection connection2; 067 static Admin admin1; 068 static Admin admin2; 069 static ReplicationAdmin adminExt; 070 071 @Rule 072 public TestName name = new TestName(); 073 074 @BeforeClass 075 public static void setUpBeforeClass() throws Exception { 076 TestReplicationBase.setUpBeforeClass(); 077 connection1 = ConnectionFactory.createConnection(CONF1); 078 connection2 = ConnectionFactory.createConnection(CONF2); 079 admin1 = connection1.getAdmin(); 080 admin2 = connection2.getAdmin(); 081 adminExt = new ReplicationAdmin(CONF1); 082 } 083 084 @AfterClass 085 public static void tearDownAfterClass() throws Exception { 086 admin1.close(); 087 admin2.close(); 088 adminExt.close(); 089 connection1.close(); 090 connection2.close(); 091 TestReplicationBase.tearDownAfterClass(); 092 } 093 094 @Test 095 public void disableNotFullReplication() throws Exception { 096 HTableDescriptor table = new HTableDescriptor(admin2.getTableDescriptor(tableName)); 097 HColumnDescriptor f = new HColumnDescriptor("notReplicatedFamily"); 098 table.addFamily(f); 099 admin1.disableTable(tableName); 100 admin1.modifyTable(tableName, table); 101 admin1.enableTable(tableName); 102 103 admin1.disableTableReplication(tableName); 104 table = admin1.getTableDescriptor(tableName); 105 for (HColumnDescriptor fam : table.getColumnFamilies()) { 106 assertEquals(HConstants.REPLICATION_SCOPE_LOCAL, fam.getScope()); 107 } 108 109 admin1.deleteColumnFamily(table.getTableName(), f.getName()); 110 } 111 112 @Test 113 public void testEnableReplicationWhenSlaveClusterDoesntHaveTable() throws Exception { 114 admin1.disableTableReplication(tableName); 115 admin2.disableTable(tableName); 116 admin2.deleteTable(tableName); 117 assertFalse(admin2.tableExists(tableName)); 118 admin1.enableTableReplication(tableName); 119 assertTrue(admin2.tableExists(tableName)); 120 } 121 122 @Test 123 public void testEnableReplicationWhenReplicationNotEnabled() throws Exception { 124 HTableDescriptor table = new HTableDescriptor(admin1.getTableDescriptor(tableName)); 125 for (HColumnDescriptor fam : table.getColumnFamilies()) { 126 fam.setScope(HConstants.REPLICATION_SCOPE_LOCAL); 127 } 128 admin1.disableTable(tableName); 129 admin1.modifyTable(tableName, table); 130 admin1.enableTable(tableName); 131 132 admin2.disableTable(tableName); 133 admin2.modifyTable(tableName, table); 134 admin2.enableTable(tableName); 135 136 admin1.enableTableReplication(tableName); 137 table = admin1.getTableDescriptor(tableName); 138 for (HColumnDescriptor fam : table.getColumnFamilies()) { 139 assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope()); 140 } 141 } 142 143 @Test 144 public void testEnableReplicationWhenTableDescriptorIsNotSameInClusters() throws Exception { 145 HTableDescriptor table = new HTableDescriptor(admin2.getTableDescriptor(tableName)); 146 HColumnDescriptor f = new HColumnDescriptor("newFamily"); 147 table.addFamily(f); 148 admin2.disableTable(tableName); 149 admin2.modifyTable(tableName, table); 150 admin2.enableTable(tableName); 151 152 try { 153 admin1.enableTableReplication(tableName); 154 fail("Exception should be thrown if table descriptors in the clusters are not same."); 155 } catch (RuntimeException ignored) { 156 157 } 158 admin1.disableTable(tableName); 159 admin1.modifyTable(tableName, table); 160 admin1.enableTable(tableName); 161 admin1.enableTableReplication(tableName); 162 table = admin1.getTableDescriptor(tableName); 163 for (HColumnDescriptor fam : table.getColumnFamilies()) { 164 assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope()); 165 } 166 167 admin1.deleteColumnFamily(tableName, f.getName()); 168 admin2.deleteColumnFamily(tableName, f.getName()); 169 } 170 171 @Test 172 public void testDisableAndEnableReplication() throws Exception { 173 admin1.disableTableReplication(tableName); 174 HTableDescriptor table = admin1.getTableDescriptor(tableName); 175 for (HColumnDescriptor fam : table.getColumnFamilies()) { 176 assertEquals(HConstants.REPLICATION_SCOPE_LOCAL, fam.getScope()); 177 } 178 admin1.enableTableReplication(tableName); 179 table = admin1.getTableDescriptor(tableName); 180 for (HColumnDescriptor fam : table.getColumnFamilies()) { 181 assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope()); 182 } 183 } 184 185 @Test 186 public void testEnableReplicationForTableWithRegionReplica() throws Exception { 187 TableName tn = TableName.valueOf(name.getMethodName()); 188 TableDescriptor td = TableDescriptorBuilder.newBuilder(tn).setRegionReplication(5) 189 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(noRepfamName).build()).build(); 190 191 admin1.createTable(td); 192 193 try { 194 admin1.enableTableReplication(tn); 195 td = admin1.getDescriptor(tn); 196 for (ColumnFamilyDescriptor fam : td.getColumnFamilies()) { 197 assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope()); 198 } 199 } finally { 200 UTIL1.deleteTable(tn); 201 UTIL2.deleteTable(tn); 202 } 203 } 204 205 @Test(expected = TableNotFoundException.class) 206 public void testDisableReplicationForNonExistingTable() throws Exception { 207 admin1.disableTableReplication(TableName.valueOf(name.getMethodName())); 208 } 209 210 @Test(expected = TableNotFoundException.class) 211 public void testEnableReplicationForNonExistingTable() throws Exception { 212 admin1.enableTableReplication(TableName.valueOf(name.getMethodName())); 213 } 214 215 @Test(expected = IllegalArgumentException.class) 216 public void testDisableReplicationWhenTableNameAsNull() throws Exception { 217 admin1.disableTableReplication(null); 218 } 219 220 @Test(expected = IllegalArgumentException.class) 221 public void testEnableReplicationWhenTableNameAsNull() throws Exception { 222 admin1.enableTableReplication(null); 223 } 224 225 /* 226 * Test enable table replication should create table only in user explicit specified table-cfs. 227 * HBASE-14717 228 */ 229 @Test 230 public void testEnableReplicationForExplicitSetTableCfs() throws Exception { 231 final TableName tableName = TableName.valueOf(name.getMethodName()); 232 String peerId = "2"; 233 if (admin2.isTableAvailable(TestReplicationBase.tableName)) { 234 admin2.disableTable(TestReplicationBase.tableName); 235 admin2.deleteTable(TestReplicationBase.tableName); 236 } 237 assertFalse("Table should not exists in the peer cluster", 238 admin2.isTableAvailable(TestReplicationBase.tableName)); 239 240 // update peer config 241 ReplicationPeerConfig rpc = admin1.getReplicationPeerConfig(peerId); 242 rpc.setReplicateAllUserTables(false); 243 admin1.updateReplicationPeerConfig(peerId, rpc); 244 245 Map<TableName, ? extends Collection<String>> tableCfs = new HashMap<>(); 246 tableCfs.put(tableName, null); 247 try { 248 adminExt.setPeerTableCFs(peerId, tableCfs); 249 admin1.enableTableReplication(TestReplicationBase.tableName); 250 assertFalse( 251 "Table should not be created if user has set table cfs explicitly for the " 252 + "peer and this is not part of that collection", 253 admin2.isTableAvailable(TestReplicationBase.tableName)); 254 255 tableCfs.put(TestReplicationBase.tableName, null); 256 adminExt.setPeerTableCFs(peerId, tableCfs); 257 admin1.enableTableReplication(TestReplicationBase.tableName); 258 assertTrue( 259 "Table should be created if user has explicitly added table into table cfs collection", 260 admin2.isTableAvailable(TestReplicationBase.tableName)); 261 } finally { 262 adminExt.removePeerTableCFs(peerId, adminExt.getPeerTableCFs(peerId)); 263 admin1.disableTableReplication(TestReplicationBase.tableName); 264 265 rpc = admin1.getReplicationPeerConfig(peerId); 266 rpc.setReplicateAllUserTables(true); 267 admin1.updateReplicationPeerConfig(peerId, rpc); 268 } 269 } 270 271 @Test 272 public void testReplicationPeerConfigUpdateCallback() throws Exception { 273 String peerId = "1"; 274 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 275 rpc.setClusterKey(UTIL2.getClusterKey()); 276 rpc.setReplicationEndpointImpl(TestUpdatableReplicationEndpoint.class.getName()); 277 rpc.getConfiguration().put("key1", "value1"); 278 279 admin1.addReplicationPeer(peerId, rpc); 280 281 rpc.getConfiguration().put("key1", "value2"); 282 admin.updatePeerConfig(peerId, rpc); 283 if (!TestUpdatableReplicationEndpoint.hasCalledBack()) { 284 synchronized (TestUpdatableReplicationEndpoint.class) { 285 TestUpdatableReplicationEndpoint.class.wait(2000L); 286 } 287 } 288 289 assertEquals(true, TestUpdatableReplicationEndpoint.hasCalledBack()); 290 291 admin.removePeer(peerId); 292 } 293 294 public static class TestUpdatableReplicationEndpoint extends BaseReplicationEndpoint { 295 private static boolean calledBack = false; 296 297 public static boolean hasCalledBack() { 298 return calledBack; 299 } 300 301 @Override 302 public synchronized void peerConfigUpdated(ReplicationPeerConfig rpc) { 303 calledBack = true; 304 notifyAll(); 305 } 306 307 @Override 308 public void start() { 309 startAsync(); 310 } 311 312 @Override 313 public void stop() { 314 stopAsync(); 315 } 316 317 @Override 318 protected void doStart() { 319 notifyStarted(); 320 } 321 322 @Override 323 protected void doStop() { 324 notifyStopped(); 325 } 326 327 @Override 328 public UUID getPeerUUID() { 329 return UTIL1.getRandomUUID(); 330 } 331 332 @Override 333 public boolean replicate(ReplicateContext replicateContext) { 334 return false; 335 } 336 } 337}