001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertNull; 025import static org.junit.Assert.assertTrue; 026import static org.junit.Assert.fail; 027 028import java.util.ArrayList; 029import java.util.HashMap; 030import java.util.HashSet; 031import java.util.List; 032import java.util.Map; 033import java.util.Set; 034import java.util.concurrent.CompletionException; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.replication.ReplicationException; 039import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 040import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 041import org.apache.hadoop.hbase.testclassification.ClientTests; 042import org.apache.hadoop.hbase.testclassification.LargeTests; 043import org.junit.After; 044import org.junit.BeforeClass; 045import org.junit.ClassRule; 046import org.junit.Test; 047import org.junit.experimental.categories.Category; 048import org.junit.runner.RunWith; 049import org.junit.runners.Parameterized; 050 051/** 052 * Class to test asynchronous replication admin operations. 053 */ 054@RunWith(Parameterized.class) 055@Category({LargeTests.class, ClientTests.class}) 056public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { 057 058 @ClassRule 059 public static final HBaseClassTestRule CLASS_RULE = 060 HBaseClassTestRule.forClass(TestAsyncReplicationAdminApi.class); 061 062 private final String ID_ONE = "1"; 063 private final String KEY_ONE = "127.0.0.1:2181:/hbase"; 064 private final String ID_SECOND = "2"; 065 private final String KEY_SECOND = "127.0.0.1:2181:/hbase2"; 066 067 @BeforeClass 068 public static void setUpBeforeClass() throws Exception { 069 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000); 070 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000); 071 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 072 TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0); 073 TEST_UTIL.startMiniCluster(); 074 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 075 } 076 077 @After 078 public void cleanupPeer() { 079 try { 080 admin.removeReplicationPeer(ID_ONE).join(); 081 } catch (Exception e) { 082 LOG.debug("Replication peer " + ID_ONE + " may already be removed"); 083 } 084 try { 085 admin.removeReplicationPeer(ID_SECOND).join(); 086 } catch (Exception e) { 087 LOG.debug("Replication peer " + ID_SECOND + " may already be removed"); 088 } 089 } 090 091 @Test 092 public void testAddRemovePeer() throws Exception { 093 ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); 094 rpc1.setClusterKey(KEY_ONE); 095 ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); 096 rpc2.setClusterKey(KEY_SECOND); 097 // Add a valid peer 098 admin.addReplicationPeer(ID_ONE, rpc1).join(); 099 // try adding the same (fails) 100 try { 101 admin.addReplicationPeer(ID_ONE, rpc1).join(); 102 fail("Test case should fail as adding a same peer."); 103 } catch (CompletionException e) { 104 // OK! 105 } 106 assertEquals(1, admin.listReplicationPeers().get().size()); 107 // Try to remove an inexisting peer 108 try { 109 admin.removeReplicationPeer(ID_SECOND).join(); 110 fail("Test case should fail as removing a inexisting peer."); 111 } catch (CompletionException e) { 112 // OK! 113 } 114 assertEquals(1, admin.listReplicationPeers().get().size()); 115 // Add a second since multi-slave is supported 116 admin.addReplicationPeer(ID_SECOND, rpc2).join(); 117 assertEquals(2, admin.listReplicationPeers().get().size()); 118 // Remove the first peer we added 119 admin.removeReplicationPeer(ID_ONE).join(); 120 assertEquals(1, admin.listReplicationPeers().get().size()); 121 admin.removeReplicationPeer(ID_SECOND).join(); 122 assertEquals(0, admin.listReplicationPeers().get().size()); 123 } 124 125 @Test 126 public void testPeerConfig() throws Exception { 127 ReplicationPeerConfig config = new ReplicationPeerConfig(); 128 config.setClusterKey(KEY_ONE); 129 config.getConfiguration().put("key1", "value1"); 130 config.getConfiguration().put("key2", "value2"); 131 admin.addReplicationPeer(ID_ONE, config).join(); 132 133 List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get(); 134 assertEquals(1, peers.size()); 135 ReplicationPeerDescription peerOne = peers.get(0); 136 assertNotNull(peerOne); 137 assertEquals("value1", peerOne.getPeerConfig().getConfiguration().get("key1")); 138 assertEquals("value2", peerOne.getPeerConfig().getConfiguration().get("key2")); 139 140 admin.removeReplicationPeer(ID_ONE).join(); 141 } 142 143 @Test 144 public void testEnableDisablePeer() throws Exception { 145 ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); 146 rpc1.setClusterKey(KEY_ONE); 147 admin.addReplicationPeer(ID_ONE, rpc1).join(); 148 List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get(); 149 assertEquals(1, peers.size()); 150 assertTrue(peers.get(0).isEnabled()); 151 152 admin.disableReplicationPeer(ID_ONE).join(); 153 peers = admin.listReplicationPeers().get(); 154 assertEquals(1, peers.size()); 155 assertFalse(peers.get(0).isEnabled()); 156 admin.removeReplicationPeer(ID_ONE).join(); 157 } 158 159 @Test 160 public void testAppendPeerTableCFs() throws Exception { 161 ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); 162 rpc1.setClusterKey(KEY_ONE); 163 final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1"); 164 final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2"); 165 final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3"); 166 final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4"); 167 final TableName tableName5 = TableName.valueOf(tableName.getNameAsString() + "t5"); 168 final TableName tableName6 = TableName.valueOf(tableName.getNameAsString() + "t6"); 169 170 // Add a valid peer 171 admin.addReplicationPeer(ID_ONE, rpc1).join(); 172 rpc1.setReplicateAllUserTables(false); 173 admin.updateReplicationPeerConfig(ID_ONE, rpc1).join(); 174 175 Map<TableName, List<String>> tableCFs = new HashMap<>(); 176 177 // append table t1 to replication 178 tableCFs.put(tableName1, null); 179 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 180 Map<TableName, List<String>> result = admin.getReplicationPeerConfig(ID_ONE).get() 181 .getTableCFsMap(); 182 assertEquals(1, result.size()); 183 assertEquals(true, result.containsKey(tableName1)); 184 assertNull(result.get(tableName1)); 185 186 // append table t2 to replication 187 tableCFs.clear(); 188 tableCFs.put(tableName2, null); 189 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 190 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 191 assertEquals(2, result.size()); 192 assertTrue("Should contain t1", result.containsKey(tableName1)); 193 assertTrue("Should contain t2", result.containsKey(tableName2)); 194 assertNull(result.get(tableName1)); 195 assertNull(result.get(tableName2)); 196 197 // append table column family: f1 of t3 to replication 198 tableCFs.clear(); 199 tableCFs.put(tableName3, new ArrayList<>()); 200 tableCFs.get(tableName3).add("f1"); 201 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 202 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 203 assertEquals(3, result.size()); 204 assertTrue("Should contain t1", result.containsKey(tableName1)); 205 assertTrue("Should contain t2", result.containsKey(tableName2)); 206 assertTrue("Should contain t3", result.containsKey(tableName3)); 207 assertNull(result.get(tableName1)); 208 assertNull(result.get(tableName2)); 209 assertEquals(1, result.get(tableName3).size()); 210 assertEquals("f1", result.get(tableName3).get(0)); 211 212 // append table column family: f1,f2 of t4 to replication 213 tableCFs.clear(); 214 tableCFs.put(tableName4, new ArrayList<>()); 215 tableCFs.get(tableName4).add("f1"); 216 tableCFs.get(tableName4).add("f2"); 217 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 218 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 219 assertEquals(4, result.size()); 220 assertTrue("Should contain t1", result.containsKey(tableName1)); 221 assertTrue("Should contain t2", result.containsKey(tableName2)); 222 assertTrue("Should contain t3", result.containsKey(tableName3)); 223 assertTrue("Should contain t4", result.containsKey(tableName4)); 224 assertNull(result.get(tableName1)); 225 assertNull(result.get(tableName2)); 226 assertEquals(1, result.get(tableName3).size()); 227 assertEquals("f1", result.get(tableName3).get(0)); 228 assertEquals(2, result.get(tableName4).size()); 229 assertEquals("f1", result.get(tableName4).get(0)); 230 assertEquals("f2", result.get(tableName4).get(1)); 231 232 // append "table5" => [], then append "table5" => ["f1"] 233 tableCFs.clear(); 234 tableCFs.put(tableName5, new ArrayList<>()); 235 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 236 tableCFs.clear(); 237 tableCFs.put(tableName5, new ArrayList<>()); 238 tableCFs.get(tableName5).add("f1"); 239 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 240 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 241 assertEquals(5, result.size()); 242 assertTrue("Should contain t5", result.containsKey(tableName5)); 243 // null means replication all cfs of tab5 244 assertNull(result.get(tableName5)); 245 246 // append "table6" => ["f1"], then append "table6" => [] 247 tableCFs.clear(); 248 tableCFs.put(tableName6, new ArrayList<>()); 249 tableCFs.get(tableName6).add("f1"); 250 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 251 tableCFs.clear(); 252 tableCFs.put(tableName6, new ArrayList<>()); 253 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 254 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 255 assertEquals(6, result.size()); 256 assertTrue("Should contain t6", result.containsKey(tableName6)); 257 // null means replication all cfs of tab6 258 assertNull(result.get(tableName6)); 259 260 admin.removeReplicationPeer(ID_ONE).join(); 261 } 262 263 @Test 264 public void testRemovePeerTableCFs() throws Exception { 265 ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); 266 rpc1.setClusterKey(KEY_ONE); 267 final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1"); 268 final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2"); 269 final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3"); 270 final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4"); 271 // Add a valid peer 272 admin.addReplicationPeer(ID_ONE, rpc1).join(); 273 rpc1.setReplicateAllUserTables(false); 274 admin.updateReplicationPeerConfig(ID_ONE, rpc1).join(); 275 276 Map<TableName, List<String>> tableCFs = new HashMap<>(); 277 try { 278 tableCFs.put(tableName3, null); 279 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 280 fail("Test case should fail as removing table-cfs from a peer whose table-cfs is null"); 281 } catch (CompletionException e) { 282 assertTrue(e.getCause() instanceof ReplicationException); 283 } 284 assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap()); 285 286 tableCFs.clear(); 287 tableCFs.put(tableName1, null); 288 tableCFs.put(tableName2, new ArrayList<>()); 289 tableCFs.get(tableName2).add("cf1"); 290 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 291 try { 292 tableCFs.clear(); 293 tableCFs.put(tableName3, null); 294 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 295 fail("Test case should fail as removing table-cfs from a peer whose table-cfs didn't contain t3"); 296 } catch (CompletionException e) { 297 assertTrue(e.getCause() instanceof ReplicationException); 298 } 299 Map<TableName, List<String>> result = admin.getReplicationPeerConfig(ID_ONE).get() 300 .getTableCFsMap(); 301 assertEquals(2, result.size()); 302 assertTrue("Should contain t1", result.containsKey(tableName1)); 303 assertTrue("Should contain t2", result.containsKey(tableName2)); 304 assertNull(result.get(tableName1)); 305 assertEquals(1, result.get(tableName2).size()); 306 assertEquals("cf1", result.get(tableName2).get(0)); 307 308 try { 309 tableCFs.clear(); 310 tableCFs.put(tableName1, new ArrayList<>()); 311 tableCFs.get(tableName1).add("cf1"); 312 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 313 fail("Test case should fail, because table t1 didn't specify cfs in peer config"); 314 } catch (CompletionException e) { 315 assertTrue(e.getCause() instanceof ReplicationException); 316 } 317 tableCFs.clear(); 318 tableCFs.put(tableName1, null); 319 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 320 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 321 assertEquals(1, result.size()); 322 assertEquals(1, result.get(tableName2).size()); 323 assertEquals("cf1", result.get(tableName2).get(0)); 324 325 try { 326 tableCFs.clear(); 327 tableCFs.put(tableName2, null); 328 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 329 fail("Test case should fail, because table t2 hase specified cfs in peer config"); 330 } catch (CompletionException e) { 331 assertTrue(e.getCause() instanceof ReplicationException); 332 } 333 tableCFs.clear(); 334 tableCFs.put(tableName2, new ArrayList<>()); 335 tableCFs.get(tableName2).add("cf1"); 336 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 337 assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap()); 338 339 tableCFs.clear(); 340 tableCFs.put(tableName4, new ArrayList<>()); 341 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 342 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 343 assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap()); 344 345 admin.removeReplicationPeer(ID_ONE); 346 } 347 348 @Test 349 public void testSetPeerNamespaces() throws Exception { 350 String ns1 = "ns1"; 351 String ns2 = "ns2"; 352 353 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 354 rpc.setClusterKey(KEY_ONE); 355 admin.addReplicationPeer(ID_ONE, rpc).join(); 356 rpc.setReplicateAllUserTables(false); 357 admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); 358 359 // add ns1 and ns2 to peer config 360 rpc = admin.getReplicationPeerConfig(ID_ONE).get(); 361 Set<String> namespaces = new HashSet<>(); 362 namespaces.add(ns1); 363 namespaces.add(ns2); 364 rpc.setNamespaces(namespaces); 365 admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); 366 namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces(); 367 assertEquals(2, namespaces.size()); 368 assertTrue(namespaces.contains(ns1)); 369 assertTrue(namespaces.contains(ns2)); 370 371 // update peer config only contains ns1 372 rpc = admin.getReplicationPeerConfig(ID_ONE).get(); 373 namespaces = new HashSet<>(); 374 namespaces.add(ns1); 375 rpc.setNamespaces(namespaces); 376 admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); 377 namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces(); 378 assertEquals(1, namespaces.size()); 379 assertTrue(namespaces.contains(ns1)); 380 381 admin.removeReplicationPeer(ID_ONE).join(); 382 } 383 384 @Test 385 public void testNamespacesAndTableCfsConfigConflict() throws Exception { 386 String ns1 = "ns1"; 387 String ns2 = "ns2"; 388 final TableName tableName1 = TableName.valueOf(ns1 + ":" + tableName.getNameAsString() + "1"); 389 final TableName tableName2 = TableName.valueOf(ns2 + ":" + tableName.getNameAsString() + "2"); 390 391 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 392 rpc.setClusterKey(KEY_ONE); 393 admin.addReplicationPeer(ID_ONE, rpc).join(); 394 rpc.setReplicateAllUserTables(false); 395 admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); 396 397 rpc = admin.getReplicationPeerConfig(ID_ONE).get(); 398 Set<String> namespaces = new HashSet<String>(); 399 namespaces.add(ns1); 400 rpc.setNamespaces(namespaces); 401 admin.updateReplicationPeerConfig(ID_ONE, rpc).get(); 402 rpc = admin.getReplicationPeerConfig(ID_ONE).get(); 403 Map<TableName, List<String>> tableCfs = new HashMap<>(); 404 tableCfs.put(tableName1, new ArrayList<>()); 405 rpc.setTableCFsMap(tableCfs); 406 try { 407 admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); 408 fail("Test case should fail, because table " + tableName1 + " conflict with namespace " + ns1); 409 } catch (CompletionException e) { 410 // OK 411 } 412 413 rpc = admin.getReplicationPeerConfig(ID_ONE).get(); 414 tableCfs.clear(); 415 tableCfs.put(tableName2, new ArrayList<>()); 416 rpc.setTableCFsMap(tableCfs); 417 admin.updateReplicationPeerConfig(ID_ONE, rpc).get(); 418 rpc = admin.getReplicationPeerConfig(ID_ONE).get(); 419 namespaces.clear(); 420 namespaces.add(ns2); 421 rpc.setNamespaces(namespaces); 422 try { 423 admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); 424 fail("Test case should fail, because namespace " + ns2 + " conflict with table " + tableName2); 425 } catch (CompletionException e) { 426 // OK 427 } 428 429 admin.removeReplicationPeer(ID_ONE).join(); 430 } 431 432 @Test 433 public void testPeerBandwidth() throws Exception { 434 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 435 rpc.setClusterKey(KEY_ONE); 436 437 admin.addReplicationPeer(ID_ONE, rpc).join(); 438 rpc = admin.getReplicationPeerConfig(ID_ONE).get(); 439 assertEquals(0, rpc.getBandwidth()); 440 441 rpc.setBandwidth(2097152); 442 admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); 443 assertEquals(2097152, admin.getReplicationPeerConfig(ID_ONE).join().getBandwidth()); 444 445 admin.removeReplicationPeer(ID_ONE).join(); 446 } 447}