001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client.replication; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024 025import java.util.Collection; 026import java.util.HashMap; 027import java.util.Map; 028import java.util.UUID; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HColumnDescriptor; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.HTableDescriptor; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.TableNotFoundException; 035import org.apache.hadoop.hbase.client.Admin; 036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 038import org.apache.hadoop.hbase.client.Connection; 039import org.apache.hadoop.hbase.client.ConnectionFactory; 040import org.apache.hadoop.hbase.client.TableDescriptor; 041import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 042import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; 043import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 044import org.apache.hadoop.hbase.replication.TestReplicationBase; 045import org.apache.hadoop.hbase.testclassification.ClientTests; 046import org.apache.hadoop.hbase.testclassification.MediumTests; 047import org.junit.AfterClass; 048import org.junit.BeforeClass; 049import org.junit.ClassRule; 050import org.junit.Rule; 051import org.junit.Test; 052import org.junit.experimental.categories.Category; 053import org.junit.rules.TestName; 054 055/** 056 * Unit testing of ReplicationAdmin with clusters 057 */ 058@Category({ MediumTests.class, ClientTests.class }) 059public class TestReplicationAdminWithClusters extends TestReplicationBase { 060 061 @ClassRule 062 public static final HBaseClassTestRule CLASS_RULE = 063 HBaseClassTestRule.forClass(TestReplicationAdminWithClusters.class); 064 065 static Connection connection1; 066 static Connection connection2; 067 static Admin admin1; 068 static Admin admin2; 069 static ReplicationAdmin adminExt; 070 071 @Rule 072 public TestName name = new TestName(); 073 074 @BeforeClass 075 public static void setUpBeforeClass() throws Exception { 076 TestReplicationBase.setUpBeforeClass(); 077 connection1 = ConnectionFactory.createConnection(conf1); 078 connection2 = ConnectionFactory.createConnection(conf2); 079 admin1 = connection1.getAdmin(); 080 admin2 = connection2.getAdmin(); 081 adminExt = new ReplicationAdmin(conf1); 082 } 083 084 @AfterClass 085 public static void tearDownAfterClass() throws Exception { 086 admin1.close(); 087 admin2.close(); 088 adminExt.close(); 089 connection1.close(); 090 connection2.close(); 091 TestReplicationBase.tearDownAfterClass(); 092 } 093 094 @Test 095 public void disableNotFullReplication() throws Exception { 096 HTableDescriptor table = new HTableDescriptor(admin2.getTableDescriptor(tableName)); 097 HColumnDescriptor f = new HColumnDescriptor("notReplicatedFamily"); 098 table.addFamily(f); 099 admin1.disableTable(tableName); 100 admin1.modifyTable(tableName, table); 101 admin1.enableTable(tableName); 102 103 104 admin1.disableTableReplication(tableName); 105 table = admin1.getTableDescriptor(tableName); 106 for (HColumnDescriptor fam : table.getColumnFamilies()) { 107 assertEquals(HConstants.REPLICATION_SCOPE_LOCAL, fam.getScope()); 108 } 109 } 110 111 @Test 112 public void testEnableReplicationWhenSlaveClusterDoesntHaveTable() throws Exception { 113 admin1.disableTableReplication(tableName); 114 admin2.disableTable(tableName); 115 admin2.deleteTable(tableName); 116 assertFalse(admin2.tableExists(tableName)); 117 admin1.enableTableReplication(tableName); 118 assertTrue(admin2.tableExists(tableName)); 119 } 120 121 @Test 122 public void testEnableReplicationWhenReplicationNotEnabled() throws Exception { 123 HTableDescriptor table = new HTableDescriptor(admin1.getTableDescriptor(tableName)); 124 for (HColumnDescriptor fam : table.getColumnFamilies()) { 125 fam.setScope(HConstants.REPLICATION_SCOPE_LOCAL); 126 } 127 admin1.disableTable(tableName); 128 admin1.modifyTable(tableName, table); 129 admin1.enableTable(tableName); 130 131 admin2.disableTable(tableName); 132 admin2.modifyTable(tableName, table); 133 admin2.enableTable(tableName); 134 135 admin1.enableTableReplication(tableName); 136 table = admin1.getTableDescriptor(tableName); 137 for (HColumnDescriptor fam : table.getColumnFamilies()) { 138 assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope()); 139 } 140 } 141 142 @Test 143 public void testEnableReplicationWhenTableDescriptorIsNotSameInClusters() throws Exception { 144 HTableDescriptor table = new HTableDescriptor(admin2.getTableDescriptor(tableName)); 145 HColumnDescriptor f = new HColumnDescriptor("newFamily"); 146 table.addFamily(f); 147 admin2.disableTable(tableName); 148 admin2.modifyTable(tableName, table); 149 admin2.enableTable(tableName); 150 151 try { 152 admin1.enableTableReplication(tableName); 153 fail("Exception should be thrown if table descriptors in the clusters are not same."); 154 } catch (RuntimeException ignored) { 155 156 } 157 admin1.disableTable(tableName); 158 admin1.modifyTable(tableName, table); 159 admin1.enableTable(tableName); 160 admin1.enableTableReplication(tableName); 161 table = admin1.getTableDescriptor(tableName); 162 for (HColumnDescriptor fam : table.getColumnFamilies()) { 163 assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope()); 164 } 165 } 166 167 @Test 168 public void testDisableAndEnableReplication() throws Exception { 169 admin1.disableTableReplication(tableName); 170 HTableDescriptor table = admin1.getTableDescriptor(tableName); 171 for (HColumnDescriptor fam : table.getColumnFamilies()) { 172 assertEquals(HConstants.REPLICATION_SCOPE_LOCAL, fam.getScope()); 173 } 174 admin1.enableTableReplication(tableName); 175 table = admin1.getTableDescriptor(tableName); 176 for (HColumnDescriptor fam : table.getColumnFamilies()) { 177 assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope()); 178 } 179 } 180 181 @Test 182 public void testEnableReplicationForTableWithRegionReplica() throws Exception { 183 TableName tn = TableName.valueOf(name.getMethodName()); 184 TableDescriptor td = TableDescriptorBuilder.newBuilder(tn) 185 .setRegionReplication(5) 186 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(noRepfamName).build()) 187 .build(); 188 189 admin1.createTable(td); 190 191 try { 192 admin1.enableTableReplication(tn); 193 td = admin1.getDescriptor(tn); 194 for (ColumnFamilyDescriptor fam : td.getColumnFamilies()) { 195 assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope()); 196 } 197 } finally { 198 utility1.deleteTable(tn); 199 utility2.deleteTable(tn); 200 } 201 } 202 203 @Test(expected = TableNotFoundException.class) 204 public void testDisableReplicationForNonExistingTable() throws Exception { 205 admin1.disableTableReplication(TableName.valueOf(name.getMethodName())); 206 } 207 208 @Test(expected = TableNotFoundException.class) 209 public void testEnableReplicationForNonExistingTable() throws Exception { 210 admin1.enableTableReplication(TableName.valueOf(name.getMethodName())); 211 } 212 213 @Test(expected = IllegalArgumentException.class) 214 public void testDisableReplicationWhenTableNameAsNull() throws Exception { 215 admin1.disableTableReplication(null); 216 } 217 218 @Test(expected = IllegalArgumentException.class) 219 public void testEnableReplicationWhenTableNameAsNull() throws Exception { 220 admin1.enableTableReplication(null); 221 } 222 223 /* 224 * Test enable table replication should create table only in user explicit specified table-cfs. 225 * HBASE-14717 226 */ 227 @Test 228 public void testEnableReplicationForExplicitSetTableCfs() throws Exception { 229 final TableName tableName = TableName.valueOf(name.getMethodName()); 230 String peerId = "2"; 231 if (admin2.isTableAvailable(TestReplicationBase.tableName)) { 232 admin2.disableTable(TestReplicationBase.tableName); 233 admin2.deleteTable(TestReplicationBase.tableName); 234 } 235 assertFalse("Table should not exists in the peer cluster", 236 admin2.isTableAvailable(TestReplicationBase.tableName)); 237 238 // update peer config 239 ReplicationPeerConfig rpc = admin1.getReplicationPeerConfig(peerId); 240 rpc.setReplicateAllUserTables(false); 241 admin1.updateReplicationPeerConfig(peerId, rpc); 242 243 Map<TableName, ? extends Collection<String>> tableCfs = new HashMap<>(); 244 tableCfs.put(tableName, null); 245 try { 246 adminExt.setPeerTableCFs(peerId, tableCfs); 247 admin1.enableTableReplication(TestReplicationBase.tableName); 248 assertFalse("Table should not be created if user has set table cfs explicitly for the " 249 + "peer and this is not part of that collection", 250 admin2.isTableAvailable(TestReplicationBase.tableName)); 251 252 tableCfs.put(TestReplicationBase.tableName, null); 253 adminExt.setPeerTableCFs(peerId, tableCfs); 254 admin1.enableTableReplication(TestReplicationBase.tableName); 255 assertTrue( 256 "Table should be created if user has explicitly added table into table cfs collection", 257 admin2.isTableAvailable(TestReplicationBase.tableName)); 258 } finally { 259 adminExt.removePeerTableCFs(peerId, adminExt.getPeerTableCFs(peerId)); 260 admin1.disableTableReplication(TestReplicationBase.tableName); 261 262 rpc = admin1.getReplicationPeerConfig(peerId); 263 rpc.setReplicateAllUserTables(true); 264 admin1.updateReplicationPeerConfig(peerId, rpc); 265 } 266 } 267 268 @Test 269 public void testReplicationPeerConfigUpdateCallback() throws Exception { 270 String peerId = "1"; 271 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 272 rpc.setClusterKey(utility2.getClusterKey()); 273 rpc.setReplicationEndpointImpl(TestUpdatableReplicationEndpoint.class.getName()); 274 rpc.getConfiguration().put("key1", "value1"); 275 276 admin1.addReplicationPeer(peerId, rpc); 277 278 rpc.getConfiguration().put("key1", "value2"); 279 admin.updatePeerConfig(peerId, rpc); 280 if (!TestUpdatableReplicationEndpoint.hasCalledBack()) { 281 synchronized(TestUpdatableReplicationEndpoint.class) { 282 TestUpdatableReplicationEndpoint.class.wait(2000L); 283 } 284 } 285 286 assertEquals(true, TestUpdatableReplicationEndpoint.hasCalledBack()); 287 } 288 289 public static class TestUpdatableReplicationEndpoint extends BaseReplicationEndpoint { 290 private static boolean calledBack = false; 291 public static boolean hasCalledBack(){ 292 return calledBack; 293 } 294 @Override 295 public synchronized void peerConfigUpdated(ReplicationPeerConfig rpc){ 296 calledBack = true; 297 notifyAll(); 298 } 299 300 @Override 301 public void start() { 302 startAsync(); 303 } 304 305 @Override 306 public void stop() { 307 stopAsync(); 308 } 309 310 @Override 311 protected void doStart() { 312 notifyStarted(); 313 } 314 315 @Override 316 protected void doStop() { 317 notifyStopped(); 318 } 319 320 321 @Override 322 public UUID getPeerUUID() { 323 return UUID.randomUUID(); 324 } 325 326 @Override 327 public boolean replicate(ReplicateContext replicateContext) { 328 return false; 329 } 330 } 331}