001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNull;
025import static org.junit.Assert.assertTrue;
026import static org.junit.Assert.fail;
027
028import java.util.ArrayList;
029import java.util.HashMap;
030import java.util.HashSet;
031import java.util.List;
032import java.util.Map;
033import java.util.Set;
034import java.util.concurrent.CompletionException;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.replication.ReplicationException;
039import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
040import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
041import org.apache.hadoop.hbase.testclassification.ClientTests;
042import org.apache.hadoop.hbase.testclassification.LargeTests;
043import org.junit.After;
044import org.junit.BeforeClass;
045import org.junit.ClassRule;
046import org.junit.Test;
047import org.junit.experimental.categories.Category;
048import org.junit.runner.RunWith;
049import org.junit.runners.Parameterized;
050
051/**
052 * Class to test asynchronous replication admin operations.
053 */
054@RunWith(Parameterized.class)
055@Category({LargeTests.class, ClientTests.class})
056public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
057
058  @ClassRule
059  public static final HBaseClassTestRule CLASS_RULE =
060      HBaseClassTestRule.forClass(TestAsyncReplicationAdminApi.class);
061
062  private final String ID_ONE = "1";
063  private final String KEY_ONE = "127.0.0.1:2181:/hbase";
064  private final String ID_SECOND = "2";
065  private final String KEY_SECOND = "127.0.0.1:2181:/hbase2";
066
067  @BeforeClass
068  public static void setUpBeforeClass() throws Exception {
069    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
070    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
071    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
072    TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
073    TEST_UTIL.startMiniCluster();
074    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
075  }
076
077  @After
078  public void cleanupPeer() {
079    try {
080      admin.removeReplicationPeer(ID_ONE).join();
081    } catch (Exception e) {
082      LOG.debug("Replication peer " + ID_ONE + " may already be removed");
083    }
084    try {
085      admin.removeReplicationPeer(ID_SECOND).join();
086    } catch (Exception e) {
087      LOG.debug("Replication peer " + ID_SECOND + " may already be removed");
088    }
089  }
090
091  @Test
092  public void testAddRemovePeer() throws Exception {
093    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
094    rpc1.setClusterKey(KEY_ONE);
095    ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
096    rpc2.setClusterKey(KEY_SECOND);
097    // Add a valid peer
098    admin.addReplicationPeer(ID_ONE, rpc1).join();
099    // try adding the same (fails)
100    try {
101      admin.addReplicationPeer(ID_ONE, rpc1).join();
102      fail("Test case should fail as adding a same peer.");
103    } catch (CompletionException e) {
104      // OK!
105    }
106    assertEquals(1, admin.listReplicationPeers().get().size());
107    // Try to remove an inexisting peer
108    try {
109      admin.removeReplicationPeer(ID_SECOND).join();
110      fail("Test case should fail as removing a inexisting peer.");
111    } catch (CompletionException e) {
112      // OK!
113    }
114    assertEquals(1, admin.listReplicationPeers().get().size());
115    // Add a second since multi-slave is supported
116    admin.addReplicationPeer(ID_SECOND, rpc2).join();
117    assertEquals(2, admin.listReplicationPeers().get().size());
118    // Remove the first peer we added
119    admin.removeReplicationPeer(ID_ONE).join();
120    assertEquals(1, admin.listReplicationPeers().get().size());
121    admin.removeReplicationPeer(ID_SECOND).join();
122    assertEquals(0, admin.listReplicationPeers().get().size());
123  }
124
125  @Test
126  public void testPeerConfig() throws Exception {
127    ReplicationPeerConfig config = new ReplicationPeerConfig();
128    config.setClusterKey(KEY_ONE);
129    config.getConfiguration().put("key1", "value1");
130    config.getConfiguration().put("key2", "value2");
131    admin.addReplicationPeer(ID_ONE, config).join();
132
133    List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get();
134    assertEquals(1, peers.size());
135    ReplicationPeerDescription peerOne = peers.get(0);
136    assertNotNull(peerOne);
137    assertEquals("value1", peerOne.getPeerConfig().getConfiguration().get("key1"));
138    assertEquals("value2", peerOne.getPeerConfig().getConfiguration().get("key2"));
139
140    admin.removeReplicationPeer(ID_ONE).join();
141  }
142
143  @Test
144  public void testEnableDisablePeer() throws Exception {
145    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
146    rpc1.setClusterKey(KEY_ONE);
147    admin.addReplicationPeer(ID_ONE, rpc1).join();
148    List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get();
149    assertEquals(1, peers.size());
150    assertTrue(peers.get(0).isEnabled());
151
152    admin.disableReplicationPeer(ID_ONE).join();
153    peers = admin.listReplicationPeers().get();
154    assertEquals(1, peers.size());
155    assertFalse(peers.get(0).isEnabled());
156    admin.removeReplicationPeer(ID_ONE).join();
157  }
158
159  @Test
160  public void testAppendPeerTableCFs() throws Exception {
161    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
162    rpc1.setClusterKey(KEY_ONE);
163    final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1");
164    final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2");
165    final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3");
166    final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4");
167    final TableName tableName5 = TableName.valueOf(tableName.getNameAsString() + "t5");
168    final TableName tableName6 = TableName.valueOf(tableName.getNameAsString() + "t6");
169
170    // Add a valid peer
171    admin.addReplicationPeer(ID_ONE, rpc1).join();
172    rpc1.setReplicateAllUserTables(false);
173    admin.updateReplicationPeerConfig(ID_ONE, rpc1).join();
174
175    Map<TableName, List<String>> tableCFs = new HashMap<>();
176
177    // append table t1 to replication
178    tableCFs.put(tableName1, null);
179    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
180    Map<TableName, List<String>> result = admin.getReplicationPeerConfig(ID_ONE).get()
181        .getTableCFsMap();
182    assertEquals(1, result.size());
183    assertEquals(true, result.containsKey(tableName1));
184    assertNull(result.get(tableName1));
185
186    // append table t2 to replication
187    tableCFs.clear();
188    tableCFs.put(tableName2, null);
189    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
190    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
191    assertEquals(2, result.size());
192    assertTrue("Should contain t1", result.containsKey(tableName1));
193    assertTrue("Should contain t2", result.containsKey(tableName2));
194    assertNull(result.get(tableName1));
195    assertNull(result.get(tableName2));
196
197    // append table column family: f1 of t3 to replication
198    tableCFs.clear();
199    tableCFs.put(tableName3, new ArrayList<>());
200    tableCFs.get(tableName3).add("f1");
201    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
202    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
203    assertEquals(3, result.size());
204    assertTrue("Should contain t1", result.containsKey(tableName1));
205    assertTrue("Should contain t2", result.containsKey(tableName2));
206    assertTrue("Should contain t3", result.containsKey(tableName3));
207    assertNull(result.get(tableName1));
208    assertNull(result.get(tableName2));
209    assertEquals(1, result.get(tableName3).size());
210    assertEquals("f1", result.get(tableName3).get(0));
211
212    // append table column family: f1,f2 of t4 to replication
213    tableCFs.clear();
214    tableCFs.put(tableName4, new ArrayList<>());
215    tableCFs.get(tableName4).add("f1");
216    tableCFs.get(tableName4).add("f2");
217    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
218    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
219    assertEquals(4, result.size());
220    assertTrue("Should contain t1", result.containsKey(tableName1));
221    assertTrue("Should contain t2", result.containsKey(tableName2));
222    assertTrue("Should contain t3", result.containsKey(tableName3));
223    assertTrue("Should contain t4", result.containsKey(tableName4));
224    assertNull(result.get(tableName1));
225    assertNull(result.get(tableName2));
226    assertEquals(1, result.get(tableName3).size());
227    assertEquals("f1", result.get(tableName3).get(0));
228    assertEquals(2, result.get(tableName4).size());
229    assertEquals("f1", result.get(tableName4).get(0));
230    assertEquals("f2", result.get(tableName4).get(1));
231
232    // append "table5" => [], then append "table5" => ["f1"]
233    tableCFs.clear();
234    tableCFs.put(tableName5, new ArrayList<>());
235    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
236    tableCFs.clear();
237    tableCFs.put(tableName5, new ArrayList<>());
238    tableCFs.get(tableName5).add("f1");
239    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
240    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
241    assertEquals(5, result.size());
242    assertTrue("Should contain t5", result.containsKey(tableName5));
243    // null means replication all cfs of tab5
244    assertNull(result.get(tableName5));
245
246    // append "table6" => ["f1"], then append "table6" => []
247    tableCFs.clear();
248    tableCFs.put(tableName6, new ArrayList<>());
249    tableCFs.get(tableName6).add("f1");
250    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
251    tableCFs.clear();
252    tableCFs.put(tableName6, new ArrayList<>());
253    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
254    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
255    assertEquals(6, result.size());
256    assertTrue("Should contain t6", result.containsKey(tableName6));
257    // null means replication all cfs of tab6
258    assertNull(result.get(tableName6));
259
260    admin.removeReplicationPeer(ID_ONE).join();
261  }
262
263  @Test
264  public void testRemovePeerTableCFs() throws Exception {
265    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
266    rpc1.setClusterKey(KEY_ONE);
267    final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1");
268    final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2");
269    final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3");
270    final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4");
271    // Add a valid peer
272    admin.addReplicationPeer(ID_ONE, rpc1).join();
273    rpc1.setReplicateAllUserTables(false);
274    admin.updateReplicationPeerConfig(ID_ONE, rpc1).join();
275
276    Map<TableName, List<String>> tableCFs = new HashMap<>();
277    try {
278      tableCFs.put(tableName3, null);
279      admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
280      fail("Test case should fail as removing table-cfs from a peer whose table-cfs is null");
281    } catch (CompletionException e) {
282      assertTrue(e.getCause() instanceof ReplicationException);
283    }
284    assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap());
285
286    tableCFs.clear();
287    tableCFs.put(tableName1, null);
288    tableCFs.put(tableName2, new ArrayList<>());
289    tableCFs.get(tableName2).add("cf1");
290    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
291    try {
292      tableCFs.clear();
293      tableCFs.put(tableName3, null);
294      admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
295      fail("Test case should fail as removing table-cfs from a peer whose table-cfs didn't contain t3");
296    } catch (CompletionException e) {
297      assertTrue(e.getCause() instanceof ReplicationException);
298    }
299    Map<TableName, List<String>> result = admin.getReplicationPeerConfig(ID_ONE).get()
300        .getTableCFsMap();
301    assertEquals(2, result.size());
302    assertTrue("Should contain t1", result.containsKey(tableName1));
303    assertTrue("Should contain t2", result.containsKey(tableName2));
304    assertNull(result.get(tableName1));
305    assertEquals(1, result.get(tableName2).size());
306    assertEquals("cf1", result.get(tableName2).get(0));
307
308    try {
309      tableCFs.clear();
310      tableCFs.put(tableName1, new ArrayList<>());
311      tableCFs.get(tableName1).add("cf1");
312      admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
313      fail("Test case should fail, because table t1 didn't specify cfs in peer config");
314    } catch (CompletionException e) {
315      assertTrue(e.getCause() instanceof ReplicationException);
316    }
317    tableCFs.clear();
318    tableCFs.put(tableName1, null);
319    admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
320    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
321    assertEquals(1, result.size());
322    assertEquals(1, result.get(tableName2).size());
323    assertEquals("cf1", result.get(tableName2).get(0));
324
325    try {
326      tableCFs.clear();
327      tableCFs.put(tableName2, null);
328      admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
329      fail("Test case should fail, because table t2 hase specified cfs in peer config");
330    } catch (CompletionException e) {
331      assertTrue(e.getCause() instanceof ReplicationException);
332    }
333    tableCFs.clear();
334    tableCFs.put(tableName2, new ArrayList<>());
335    tableCFs.get(tableName2).add("cf1");
336    admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
337    assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap());
338
339    tableCFs.clear();
340    tableCFs.put(tableName4, new ArrayList<>());
341    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
342    admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
343    assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap());
344
345    admin.removeReplicationPeer(ID_ONE);
346  }
347
348  @Test
349  public void testSetPeerNamespaces() throws Exception {
350    String ns1 = "ns1";
351    String ns2 = "ns2";
352
353    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
354    rpc.setClusterKey(KEY_ONE);
355    admin.addReplicationPeer(ID_ONE, rpc).join();
356    rpc.setReplicateAllUserTables(false);
357    admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
358
359    // add ns1 and ns2 to peer config
360    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
361    Set<String> namespaces = new HashSet<>();
362    namespaces.add(ns1);
363    namespaces.add(ns2);
364    rpc.setNamespaces(namespaces);
365    admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
366    namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces();
367    assertEquals(2, namespaces.size());
368    assertTrue(namespaces.contains(ns1));
369    assertTrue(namespaces.contains(ns2));
370
371    // update peer config only contains ns1
372    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
373    namespaces = new HashSet<>();
374    namespaces.add(ns1);
375    rpc.setNamespaces(namespaces);
376    admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
377    namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces();
378    assertEquals(1, namespaces.size());
379    assertTrue(namespaces.contains(ns1));
380
381    admin.removeReplicationPeer(ID_ONE).join();
382  }
383
384  @Test
385  public void testNamespacesAndTableCfsConfigConflict() throws Exception {
386    String ns1 = "ns1";
387    String ns2 = "ns2";
388    final TableName tableName1 = TableName.valueOf(ns1 + ":" + tableName.getNameAsString() + "1");
389    final TableName tableName2 = TableName.valueOf(ns2 + ":" + tableName.getNameAsString() + "2");
390
391    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
392    rpc.setClusterKey(KEY_ONE);
393    admin.addReplicationPeer(ID_ONE, rpc).join();
394    rpc.setReplicateAllUserTables(false);
395    admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
396
397    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
398    Set<String> namespaces = new HashSet<String>();
399    namespaces.add(ns1);
400    rpc.setNamespaces(namespaces);
401    admin.updateReplicationPeerConfig(ID_ONE, rpc).get();
402    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
403    Map<TableName, List<String>> tableCfs = new HashMap<>();
404    tableCfs.put(tableName1, new ArrayList<>());
405    rpc.setTableCFsMap(tableCfs);
406    try {
407      admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
408      fail("Test case should fail, because table " + tableName1 + " conflict with namespace " + ns1);
409    } catch (CompletionException e) {
410      // OK
411    }
412
413    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
414    tableCfs.clear();
415    tableCfs.put(tableName2, new ArrayList<>());
416    rpc.setTableCFsMap(tableCfs);
417    admin.updateReplicationPeerConfig(ID_ONE, rpc).get();
418    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
419    namespaces.clear();
420    namespaces.add(ns2);
421    rpc.setNamespaces(namespaces);
422    try {
423      admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
424      fail("Test case should fail, because namespace " + ns2 + " conflict with table " + tableName2);
425    } catch (CompletionException e) {
426      // OK
427    }
428
429    admin.removeReplicationPeer(ID_ONE).join();
430  }
431
432  @Test
433  public void testPeerBandwidth() throws Exception {
434    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
435    rpc.setClusterKey(KEY_ONE);
436
437    admin.addReplicationPeer(ID_ONE, rpc).join();
438    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
439    assertEquals(0, rpc.getBandwidth());
440
441    rpc.setBandwidth(2097152);
442    admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
443    assertEquals(2097152, admin.getReplicationPeerConfig(ID_ONE).join().getBandwidth());
444
445    admin.removeReplicationPeer(ID_ONE).join();
446  }
447}