001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
021import static org.hamcrest.CoreMatchers.instanceOf;
022import static org.hamcrest.CoreMatchers.startsWith;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertFalse;
025import static org.junit.Assert.assertNotNull;
026import static org.junit.Assert.assertNull;
027import static org.junit.Assert.assertThat;
028import static org.junit.Assert.assertTrue;
029import static org.junit.Assert.fail;
030
031import java.io.IOException;
032import java.util.ArrayList;
033import java.util.HashMap;
034import java.util.HashSet;
035import java.util.List;
036import java.util.Map;
037import java.util.Set;
038import java.util.concurrent.CompletionException;
039import java.util.concurrent.ExecutionException;
040import org.apache.hadoop.hbase.DoNotRetryIOException;
041import org.apache.hadoop.hbase.HBaseClassTestRule;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.ServerName;
044import org.apache.hadoop.hbase.TableName;
045import org.apache.hadoop.hbase.replication.ReplicationException;
046import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
047import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
048import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
049import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
050import org.apache.hadoop.hbase.replication.VerifyWALEntriesReplicationEndpoint;
051import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
052import org.apache.hadoop.hbase.testclassification.ClientTests;
053import org.apache.hadoop.hbase.testclassification.LargeTests;
054import org.junit.After;
055import org.junit.BeforeClass;
056import org.junit.ClassRule;
057import org.junit.Test;
058import org.junit.experimental.categories.Category;
059import org.junit.runner.RunWith;
060import org.junit.runners.Parameterized;
061
062/**
063 * Class to test asynchronous replication admin operations.
064 */
065@RunWith(Parameterized.class)
066@Category({ LargeTests.class, ClientTests.class })
067public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
068
069  @ClassRule
070  public static final HBaseClassTestRule CLASS_RULE =
071    HBaseClassTestRule.forClass(TestAsyncReplicationAdminApi.class);
072
073  private final String ID_ONE = "1";
074  private final String KEY_ONE = "127.0.0.1:2181:/hbase";
075  private final String ID_TWO = "2";
076  private final String KEY_TWO = "127.0.0.1:2181:/hbase2";
077
078  @BeforeClass
079  public static void setUpBeforeClass() throws Exception {
080    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
081    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
082    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
083    TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
084    TEST_UTIL.startMiniCluster();
085    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
086  }
087
088  @After
089  public void clearPeerAndQueues() throws IOException, ReplicationException {
090    try {
091      admin.removeReplicationPeer(ID_ONE).join();
092    } catch (Exception e) {
093    }
094    try {
095      admin.removeReplicationPeer(ID_TWO).join();
096    } catch (Exception e) {
097    }
098    ReplicationQueueStorage queueStorage = ReplicationStorageFactory
099      .getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration());
100    for (ServerName serverName : queueStorage.getListOfReplicators()) {
101      for (String queue : queueStorage.getAllQueues(serverName)) {
102        queueStorage.removeQueue(serverName, queue);
103      }
104    }
105  }
106
107  @Test
108  public void testAddRemovePeer() throws Exception {
109    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
110    rpc1.setClusterKey(KEY_ONE);
111    ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
112    rpc2.setClusterKey(KEY_TWO);
113    // Add a valid peer
114    admin.addReplicationPeer(ID_ONE, rpc1).join();
115    // try adding the same (fails)
116    try {
117      admin.addReplicationPeer(ID_ONE, rpc1).join();
118      fail("Test case should fail as adding a same peer.");
119    } catch (CompletionException e) {
120      // OK!
121    }
122    assertEquals(1, admin.listReplicationPeers().get().size());
123    // Try to remove an inexisting peer
124    try {
125      admin.removeReplicationPeer(ID_TWO).join();
126      fail("Test case should fail as removing a inexisting peer.");
127    } catch (CompletionException e) {
128      // OK!
129    }
130    assertEquals(1, admin.listReplicationPeers().get().size());
131    // Add a second since multi-slave is supported
132    admin.addReplicationPeer(ID_TWO, rpc2).join();
133    assertEquals(2, admin.listReplicationPeers().get().size());
134    // Remove the first peer we added
135    admin.removeReplicationPeer(ID_ONE).join();
136    assertEquals(1, admin.listReplicationPeers().get().size());
137    admin.removeReplicationPeer(ID_TWO).join();
138    assertEquals(0, admin.listReplicationPeers().get().size());
139  }
140
141  @Test
142  public void testPeerConfig() throws Exception {
143    ReplicationPeerConfig config = new ReplicationPeerConfig();
144    config.setClusterKey(KEY_ONE);
145    config.getConfiguration().put("key1", "value1");
146    config.getConfiguration().put("key2", "value2");
147    admin.addReplicationPeer(ID_ONE, config).join();
148
149    List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get();
150    assertEquals(1, peers.size());
151    ReplicationPeerDescription peerOne = peers.get(0);
152    assertNotNull(peerOne);
153    assertEquals("value1", peerOne.getPeerConfig().getConfiguration().get("key1"));
154    assertEquals("value2", peerOne.getPeerConfig().getConfiguration().get("key2"));
155
156    admin.removeReplicationPeer(ID_ONE).join();
157  }
158
159  @Test
160  public void testEnableDisablePeer() throws Exception {
161    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
162    rpc1.setClusterKey(KEY_ONE);
163    admin.addReplicationPeer(ID_ONE, rpc1).join();
164    List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get();
165    assertEquals(1, peers.size());
166    assertTrue(peers.get(0).isEnabled());
167
168    admin.disableReplicationPeer(ID_ONE).join();
169    peers = admin.listReplicationPeers().get();
170    assertEquals(1, peers.size());
171    assertFalse(peers.get(0).isEnabled());
172    admin.removeReplicationPeer(ID_ONE).join();
173  }
174
175  @Test
176  public void testAppendPeerTableCFs() throws Exception {
177    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
178    rpc1.setClusterKey(KEY_ONE);
179    final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1");
180    final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2");
181    final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3");
182    final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4");
183    final TableName tableName5 = TableName.valueOf(tableName.getNameAsString() + "t5");
184    final TableName tableName6 = TableName.valueOf(tableName.getNameAsString() + "t6");
185
186    // Add a valid peer
187    admin.addReplicationPeer(ID_ONE, rpc1).join();
188    rpc1.setReplicateAllUserTables(false);
189    admin.updateReplicationPeerConfig(ID_ONE, rpc1).join();
190
191    Map<TableName, List<String>> tableCFs = new HashMap<>();
192
193    // append table t1 to replication
194    tableCFs.put(tableName1, null);
195    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
196    Map<TableName, List<String>> result =
197      admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
198    assertEquals(1, result.size());
199    assertEquals(true, result.containsKey(tableName1));
200    assertNull(result.get(tableName1));
201
202    // append table t2 to replication
203    tableCFs.clear();
204    tableCFs.put(tableName2, null);
205    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
206    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
207    assertEquals(2, result.size());
208    assertTrue("Should contain t1", result.containsKey(tableName1));
209    assertTrue("Should contain t2", result.containsKey(tableName2));
210    assertNull(result.get(tableName1));
211    assertNull(result.get(tableName2));
212
213    // append table column family: f1 of t3 to replication
214    tableCFs.clear();
215    tableCFs.put(tableName3, new ArrayList<>());
216    tableCFs.get(tableName3).add("f1");
217    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
218    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
219    assertEquals(3, result.size());
220    assertTrue("Should contain t1", result.containsKey(tableName1));
221    assertTrue("Should contain t2", result.containsKey(tableName2));
222    assertTrue("Should contain t3", result.containsKey(tableName3));
223    assertNull(result.get(tableName1));
224    assertNull(result.get(tableName2));
225    assertEquals(1, result.get(tableName3).size());
226    assertEquals("f1", result.get(tableName3).get(0));
227
228    // append table column family: f1,f2 of t4 to replication
229    tableCFs.clear();
230    tableCFs.put(tableName4, new ArrayList<>());
231    tableCFs.get(tableName4).add("f1");
232    tableCFs.get(tableName4).add("f2");
233    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
234    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
235    assertEquals(4, result.size());
236    assertTrue("Should contain t1", result.containsKey(tableName1));
237    assertTrue("Should contain t2", result.containsKey(tableName2));
238    assertTrue("Should contain t3", result.containsKey(tableName3));
239    assertTrue("Should contain t4", result.containsKey(tableName4));
240    assertNull(result.get(tableName1));
241    assertNull(result.get(tableName2));
242    assertEquals(1, result.get(tableName3).size());
243    assertEquals("f1", result.get(tableName3).get(0));
244    assertEquals(2, result.get(tableName4).size());
245    assertEquals("f1", result.get(tableName4).get(0));
246    assertEquals("f2", result.get(tableName4).get(1));
247
248    // append "table5" => [], then append "table5" => ["f1"]
249    tableCFs.clear();
250    tableCFs.put(tableName5, new ArrayList<>());
251    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
252    tableCFs.clear();
253    tableCFs.put(tableName5, new ArrayList<>());
254    tableCFs.get(tableName5).add("f1");
255    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
256    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
257    assertEquals(5, result.size());
258    assertTrue("Should contain t5", result.containsKey(tableName5));
259    // null means replication all cfs of tab5
260    assertNull(result.get(tableName5));
261
262    // append "table6" => ["f1"], then append "table6" => []
263    tableCFs.clear();
264    tableCFs.put(tableName6, new ArrayList<>());
265    tableCFs.get(tableName6).add("f1");
266    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
267    tableCFs.clear();
268    tableCFs.put(tableName6, new ArrayList<>());
269    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
270    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
271    assertEquals(6, result.size());
272    assertTrue("Should contain t6", result.containsKey(tableName6));
273    // null means replication all cfs of tab6
274    assertNull(result.get(tableName6));
275
276    admin.removeReplicationPeer(ID_ONE).join();
277  }
278
279  @Test
280  public void testRemovePeerTableCFs() throws Exception {
281    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
282    rpc1.setClusterKey(KEY_ONE);
283    final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1");
284    final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2");
285    final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3");
286    final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4");
287    // Add a valid peer
288    admin.addReplicationPeer(ID_ONE, rpc1).join();
289    rpc1.setReplicateAllUserTables(false);
290    admin.updateReplicationPeerConfig(ID_ONE, rpc1).join();
291
292    Map<TableName, List<String>> tableCFs = new HashMap<>();
293    try {
294      tableCFs.put(tableName3, null);
295      admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
296      fail("Test case should fail as removing table-cfs from a peer whose table-cfs is null");
297    } catch (CompletionException e) {
298      assertTrue(e.getCause() instanceof ReplicationException);
299    }
300    assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap());
301
302    tableCFs.clear();
303    tableCFs.put(tableName1, null);
304    tableCFs.put(tableName2, new ArrayList<>());
305    tableCFs.get(tableName2).add("cf1");
306    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
307    try {
308      tableCFs.clear();
309      tableCFs.put(tableName3, null);
310      admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
311      fail("Test case should fail as removing table-cfs from a peer whose" +
312        " table-cfs didn't contain t3");
313    } catch (CompletionException e) {
314      assertTrue(e.getCause() instanceof ReplicationException);
315    }
316    Map<TableName, List<String>> result =
317      admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
318    assertEquals(2, result.size());
319    assertTrue("Should contain t1", result.containsKey(tableName1));
320    assertTrue("Should contain t2", result.containsKey(tableName2));
321    assertNull(result.get(tableName1));
322    assertEquals(1, result.get(tableName2).size());
323    assertEquals("cf1", result.get(tableName2).get(0));
324
325    try {
326      tableCFs.clear();
327      tableCFs.put(tableName1, new ArrayList<>());
328      tableCFs.get(tableName1).add("cf1");
329      admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
330      fail("Test case should fail, because table t1 didn't specify cfs in peer config");
331    } catch (CompletionException e) {
332      assertTrue(e.getCause() instanceof ReplicationException);
333    }
334    tableCFs.clear();
335    tableCFs.put(tableName1, null);
336    admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
337    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
338    assertEquals(1, result.size());
339    assertEquals(1, result.get(tableName2).size());
340    assertEquals("cf1", result.get(tableName2).get(0));
341
342    try {
343      tableCFs.clear();
344      tableCFs.put(tableName2, null);
345      admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
346      fail("Test case should fail, because table t2 hase specified cfs in peer config");
347    } catch (CompletionException e) {
348      assertTrue(e.getCause() instanceof ReplicationException);
349    }
350    tableCFs.clear();
351    tableCFs.put(tableName2, new ArrayList<>());
352    tableCFs.get(tableName2).add("cf1");
353    admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
354    assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap());
355
356    tableCFs.clear();
357    tableCFs.put(tableName4, new ArrayList<>());
358    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
359    admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
360    assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap());
361
362    admin.removeReplicationPeer(ID_ONE);
363  }
364
365  @Test
366  public void testSetPeerNamespaces() throws Exception {
367    String ns1 = "ns1";
368    String ns2 = "ns2";
369
370    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
371    rpc.setClusterKey(KEY_ONE);
372    admin.addReplicationPeer(ID_ONE, rpc).join();
373    rpc.setReplicateAllUserTables(false);
374    admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
375
376    // add ns1 and ns2 to peer config
377    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
378    Set<String> namespaces = new HashSet<>();
379    namespaces.add(ns1);
380    namespaces.add(ns2);
381    rpc.setNamespaces(namespaces);
382    admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
383    namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces();
384    assertEquals(2, namespaces.size());
385    assertTrue(namespaces.contains(ns1));
386    assertTrue(namespaces.contains(ns2));
387
388    // update peer config only contains ns1
389    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
390    namespaces = new HashSet<>();
391    namespaces.add(ns1);
392    rpc.setNamespaces(namespaces);
393    admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
394    namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces();
395    assertEquals(1, namespaces.size());
396    assertTrue(namespaces.contains(ns1));
397
398    admin.removeReplicationPeer(ID_ONE).join();
399  }
400
401  @Test
402  public void testNamespacesAndTableCfsConfigConflict() throws Exception {
403    String ns1 = "ns1";
404    String ns2 = "ns2";
405    final TableName tableName1 = TableName.valueOf(ns1 + ":" + tableName.getNameAsString() + "1");
406    final TableName tableName2 = TableName.valueOf(ns2 + ":" + tableName.getNameAsString() + "2");
407
408    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
409    rpc.setClusterKey(KEY_ONE);
410    admin.addReplicationPeer(ID_ONE, rpc).join();
411    rpc.setReplicateAllUserTables(false);
412    admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
413
414    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
415    Set<String> namespaces = new HashSet<String>();
416    namespaces.add(ns1);
417    rpc.setNamespaces(namespaces);
418    admin.updateReplicationPeerConfig(ID_ONE, rpc).get();
419    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
420    Map<TableName, List<String>> tableCfs = new HashMap<>();
421    tableCfs.put(tableName1, new ArrayList<>());
422    rpc.setTableCFsMap(tableCfs);
423    try {
424      admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
425      fail(
426        "Test case should fail, because table " + tableName1 + " conflict with namespace " + ns1);
427    } catch (CompletionException e) {
428      // OK
429    }
430
431    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
432    tableCfs.clear();
433    tableCfs.put(tableName2, new ArrayList<>());
434    rpc.setTableCFsMap(tableCfs);
435    admin.updateReplicationPeerConfig(ID_ONE, rpc).get();
436    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
437    namespaces.clear();
438    namespaces.add(ns2);
439    rpc.setNamespaces(namespaces);
440    try {
441      admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
442      fail(
443        "Test case should fail, because namespace " + ns2 + " conflict with table " + tableName2);
444    } catch (CompletionException e) {
445      // OK
446    }
447
448    admin.removeReplicationPeer(ID_ONE).join();
449  }
450
451  @Test
452  public void testPeerBandwidth() throws Exception {
453    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
454    rpc.setClusterKey(KEY_ONE);
455
456    admin.addReplicationPeer(ID_ONE, rpc).join();
457    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
458    assertEquals(0, rpc.getBandwidth());
459
460    rpc.setBandwidth(2097152);
461    admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
462    assertEquals(2097152, admin.getReplicationPeerConfig(ID_ONE).join().getBandwidth());
463
464    admin.removeReplicationPeer(ID_ONE).join();
465  }
466
467  @Test
468  public void testInvalidClusterKey() throws InterruptedException {
469    try {
470      admin.addReplicationPeer(ID_ONE,
471        ReplicationPeerConfig.newBuilder().setClusterKey("whatever").build()).get();
472      fail();
473    } catch (ExecutionException e) {
474      assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class));
475    }
476  }
477
478  @Test
479  public void testInvalidReplicationEndpoint() throws InterruptedException {
480    try {
481      admin.addReplicationPeer(ID_ONE,
482        ReplicationPeerConfig.newBuilder().setReplicationEndpointImpl("whatever").build()).get();
483      fail();
484    } catch (ExecutionException e) {
485      assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class));
486      assertThat(e.getCause().getMessage(), startsWith("Can not instantiate"));
487    }
488  }
489
490  @Test
491  public void testSetReplicationEndpoint() throws InterruptedException, ExecutionException {
492    // make sure that we do not need to set cluster key when we use customized ReplicationEndpoint
493    admin
494      .addReplicationPeer(ID_ONE,
495        ReplicationPeerConfig.newBuilder()
496          .setReplicationEndpointImpl(VerifyWALEntriesReplicationEndpoint.class.getName()).build())
497      .get();
498
499    // but we still need to check cluster key if we specify the default ReplicationEndpoint
500    try {
501      admin
502        .addReplicationPeer(ID_TWO, ReplicationPeerConfig.newBuilder()
503          .setReplicationEndpointImpl(HBaseInterClusterReplicationEndpoint.class.getName()).build())
504        .get();
505      fail();
506    } catch (ExecutionException e) {
507      assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class));
508    }
509  }
510}