001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
021import static org.hamcrest.CoreMatchers.instanceOf;
022import static org.hamcrest.CoreMatchers.startsWith;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertFalse;
025import static org.junit.Assert.assertNotNull;
026import static org.junit.Assert.assertNull;
027import static org.junit.Assert.assertThat;
028import static org.junit.Assert.assertTrue;
029import static org.junit.Assert.fail;
030
031import java.io.IOException;
032import java.util.ArrayList;
033import java.util.HashMap;
034import java.util.HashSet;
035import java.util.List;
036import java.util.Map;
037import java.util.Set;
038import java.util.concurrent.CompletionException;
039import java.util.concurrent.ExecutionException;
040import org.apache.hadoop.hbase.DoNotRetryIOException;
041import org.apache.hadoop.hbase.HBaseClassTestRule;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.ServerName;
044import org.apache.hadoop.hbase.TableName;
045import org.apache.hadoop.hbase.replication.ReplicationException;
046import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
047import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
048import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
049import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
050import org.apache.hadoop.hbase.replication.VerifyWALEntriesReplicationEndpoint;
051import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
052import org.apache.hadoop.hbase.testclassification.ClientTests;
053import org.apache.hadoop.hbase.testclassification.LargeTests;
054import org.junit.After;
055import org.junit.BeforeClass;
056import org.junit.ClassRule;
057import org.junit.Test;
058import org.junit.experimental.categories.Category;
059import org.junit.runner.RunWith;
060import org.junit.runners.Parameterized;
061
062/**
063 * Class to test asynchronous replication admin operations.
064 */
065@RunWith(Parameterized.class)
066@Category({ LargeTests.class, ClientTests.class })
067public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
068
069  @ClassRule
070  public static final HBaseClassTestRule CLASS_RULE =
071    HBaseClassTestRule.forClass(TestAsyncReplicationAdminApi.class);
072
073  private final String ID_ONE = "1";
074  private static String KEY_ONE;
075  private final String ID_TWO = "2";
076  private static String KEY_TWO;
077
078  @BeforeClass
079  public static void setUpBeforeClass() throws Exception {
080    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
081    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
082    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
083    TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
084    TEST_UTIL.startMiniCluster();
085    KEY_ONE = TEST_UTIL.getClusterKey() + "-test1";
086    KEY_TWO = TEST_UTIL.getClusterKey() + "-test2";
087    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
088  }
089
090  @After
091  public void clearPeerAndQueues() throws IOException, ReplicationException {
092    try {
093      admin.removeReplicationPeer(ID_ONE).join();
094    } catch (Exception e) {
095    }
096    try {
097      admin.removeReplicationPeer(ID_TWO).join();
098    } catch (Exception e) {
099    }
100    ReplicationQueueStorage queueStorage = ReplicationStorageFactory
101      .getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration());
102    for (ServerName serverName : queueStorage.getListOfReplicators()) {
103      for (String queue : queueStorage.getAllQueues(serverName)) {
104        queueStorage.removeQueue(serverName, queue);
105      }
106    }
107  }
108
109  @Test
110  public void testAddRemovePeer() throws Exception {
111    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
112    rpc1.setClusterKey(KEY_ONE);
113    ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
114    rpc2.setClusterKey(KEY_TWO);
115    // Add a valid peer
116    admin.addReplicationPeer(ID_ONE, rpc1).join();
117    // try adding the same (fails)
118    try {
119      admin.addReplicationPeer(ID_ONE, rpc1).join();
120      fail("Test case should fail as adding a same peer.");
121    } catch (CompletionException e) {
122      // OK!
123    }
124    assertEquals(1, admin.listReplicationPeers().get().size());
125    // Try to remove an inexisting peer
126    try {
127      admin.removeReplicationPeer(ID_TWO).join();
128      fail("Test case should fail as removing a inexisting peer.");
129    } catch (CompletionException e) {
130      // OK!
131    }
132    assertEquals(1, admin.listReplicationPeers().get().size());
133    // Add a second since multi-slave is supported
134    admin.addReplicationPeer(ID_TWO, rpc2).join();
135    assertEquals(2, admin.listReplicationPeers().get().size());
136    // Remove the first peer we added
137    admin.removeReplicationPeer(ID_ONE).join();
138    assertEquals(1, admin.listReplicationPeers().get().size());
139    admin.removeReplicationPeer(ID_TWO).join();
140    assertEquals(0, admin.listReplicationPeers().get().size());
141  }
142
143  @Test
144  public void testPeerConfig() throws Exception {
145    ReplicationPeerConfig config = new ReplicationPeerConfig();
146    config.setClusterKey(KEY_ONE);
147    config.getConfiguration().put("key1", "value1");
148    config.getConfiguration().put("key2", "value2");
149    admin.addReplicationPeer(ID_ONE, config).join();
150
151    List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get();
152    assertEquals(1, peers.size());
153    ReplicationPeerDescription peerOne = peers.get(0);
154    assertNotNull(peerOne);
155    assertEquals("value1", peerOne.getPeerConfig().getConfiguration().get("key1"));
156    assertEquals("value2", peerOne.getPeerConfig().getConfiguration().get("key2"));
157
158    admin.removeReplicationPeer(ID_ONE).join();
159  }
160
161  @Test
162  public void testEnableDisablePeer() throws Exception {
163    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
164    rpc1.setClusterKey(KEY_ONE);
165    admin.addReplicationPeer(ID_ONE, rpc1).join();
166    List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get();
167    assertEquals(1, peers.size());
168    assertTrue(peers.get(0).isEnabled());
169
170    admin.disableReplicationPeer(ID_ONE).join();
171    peers = admin.listReplicationPeers().get();
172    assertEquals(1, peers.size());
173    assertFalse(peers.get(0).isEnabled());
174    admin.removeReplicationPeer(ID_ONE).join();
175  }
176
177  @Test
178  public void testAppendPeerTableCFs() throws Exception {
179    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
180    rpc1.setClusterKey(KEY_ONE);
181    final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1");
182    final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2");
183    final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3");
184    final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4");
185    final TableName tableName5 = TableName.valueOf(tableName.getNameAsString() + "t5");
186    final TableName tableName6 = TableName.valueOf(tableName.getNameAsString() + "t6");
187
188    // Add a valid peer
189    admin.addReplicationPeer(ID_ONE, rpc1).join();
190    rpc1.setReplicateAllUserTables(false);
191    admin.updateReplicationPeerConfig(ID_ONE, rpc1).join();
192
193    Map<TableName, List<String>> tableCFs = new HashMap<>();
194
195    // append table t1 to replication
196    tableCFs.put(tableName1, null);
197    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
198    Map<TableName, List<String>> result =
199      admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
200    assertEquals(1, result.size());
201    assertEquals(true, result.containsKey(tableName1));
202    assertNull(result.get(tableName1));
203
204    // append table t2 to replication
205    tableCFs.clear();
206    tableCFs.put(tableName2, null);
207    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
208    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
209    assertEquals(2, result.size());
210    assertTrue("Should contain t1", result.containsKey(tableName1));
211    assertTrue("Should contain t2", result.containsKey(tableName2));
212    assertNull(result.get(tableName1));
213    assertNull(result.get(tableName2));
214
215    // append table column family: f1 of t3 to replication
216    tableCFs.clear();
217    tableCFs.put(tableName3, new ArrayList<>());
218    tableCFs.get(tableName3).add("f1");
219    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
220    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
221    assertEquals(3, result.size());
222    assertTrue("Should contain t1", result.containsKey(tableName1));
223    assertTrue("Should contain t2", result.containsKey(tableName2));
224    assertTrue("Should contain t3", result.containsKey(tableName3));
225    assertNull(result.get(tableName1));
226    assertNull(result.get(tableName2));
227    assertEquals(1, result.get(tableName3).size());
228    assertEquals("f1", result.get(tableName3).get(0));
229
230    // append table column family: f1,f2 of t4 to replication
231    tableCFs.clear();
232    tableCFs.put(tableName4, new ArrayList<>());
233    tableCFs.get(tableName4).add("f1");
234    tableCFs.get(tableName4).add("f2");
235    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
236    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
237    assertEquals(4, result.size());
238    assertTrue("Should contain t1", result.containsKey(tableName1));
239    assertTrue("Should contain t2", result.containsKey(tableName2));
240    assertTrue("Should contain t3", result.containsKey(tableName3));
241    assertTrue("Should contain t4", result.containsKey(tableName4));
242    assertNull(result.get(tableName1));
243    assertNull(result.get(tableName2));
244    assertEquals(1, result.get(tableName3).size());
245    assertEquals("f1", result.get(tableName3).get(0));
246    assertEquals(2, result.get(tableName4).size());
247    assertEquals("f1", result.get(tableName4).get(0));
248    assertEquals("f2", result.get(tableName4).get(1));
249
250    // append "table5" => [], then append "table5" => ["f1"]
251    tableCFs.clear();
252    tableCFs.put(tableName5, new ArrayList<>());
253    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
254    tableCFs.clear();
255    tableCFs.put(tableName5, new ArrayList<>());
256    tableCFs.get(tableName5).add("f1");
257    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
258    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
259    assertEquals(5, result.size());
260    assertTrue("Should contain t5", result.containsKey(tableName5));
261    // null means replication all cfs of tab5
262    assertNull(result.get(tableName5));
263
264    // append "table6" => ["f1"], then append "table6" => []
265    tableCFs.clear();
266    tableCFs.put(tableName6, new ArrayList<>());
267    tableCFs.get(tableName6).add("f1");
268    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
269    tableCFs.clear();
270    tableCFs.put(tableName6, new ArrayList<>());
271    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
272    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
273    assertEquals(6, result.size());
274    assertTrue("Should contain t6", result.containsKey(tableName6));
275    // null means replication all cfs of tab6
276    assertNull(result.get(tableName6));
277
278    admin.removeReplicationPeer(ID_ONE).join();
279  }
280
281  @Test
282  public void testRemovePeerTableCFs() throws Exception {
283    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
284    rpc1.setClusterKey(KEY_ONE);
285    final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1");
286    final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2");
287    final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3");
288    final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4");
289    // Add a valid peer
290    admin.addReplicationPeer(ID_ONE, rpc1).join();
291    rpc1.setReplicateAllUserTables(false);
292    admin.updateReplicationPeerConfig(ID_ONE, rpc1).join();
293
294    Map<TableName, List<String>> tableCFs = new HashMap<>();
295    try {
296      tableCFs.put(tableName3, null);
297      admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
298      fail("Test case should fail as removing table-cfs from a peer whose table-cfs is null");
299    } catch (CompletionException e) {
300      assertTrue(e.getCause() instanceof ReplicationException);
301    }
302    assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap());
303
304    tableCFs.clear();
305    tableCFs.put(tableName1, null);
306    tableCFs.put(tableName2, new ArrayList<>());
307    tableCFs.get(tableName2).add("cf1");
308    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
309    try {
310      tableCFs.clear();
311      tableCFs.put(tableName3, null);
312      admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
313      fail("Test case should fail as removing table-cfs from a peer whose" +
314        " table-cfs didn't contain t3");
315    } catch (CompletionException e) {
316      assertTrue(e.getCause() instanceof ReplicationException);
317    }
318    Map<TableName, List<String>> result =
319      admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
320    assertEquals(2, result.size());
321    assertTrue("Should contain t1", result.containsKey(tableName1));
322    assertTrue("Should contain t2", result.containsKey(tableName2));
323    assertNull(result.get(tableName1));
324    assertEquals(1, result.get(tableName2).size());
325    assertEquals("cf1", result.get(tableName2).get(0));
326
327    try {
328      tableCFs.clear();
329      tableCFs.put(tableName1, new ArrayList<>());
330      tableCFs.get(tableName1).add("cf1");
331      admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
332      fail("Test case should fail, because table t1 didn't specify cfs in peer config");
333    } catch (CompletionException e) {
334      assertTrue(e.getCause() instanceof ReplicationException);
335    }
336    tableCFs.clear();
337    tableCFs.put(tableName1, null);
338    admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
339    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
340    assertEquals(1, result.size());
341    assertEquals(1, result.get(tableName2).size());
342    assertEquals("cf1", result.get(tableName2).get(0));
343
344    try {
345      tableCFs.clear();
346      tableCFs.put(tableName2, null);
347      admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
348      fail("Test case should fail, because table t2 hase specified cfs in peer config");
349    } catch (CompletionException e) {
350      assertTrue(e.getCause() instanceof ReplicationException);
351    }
352    tableCFs.clear();
353    tableCFs.put(tableName2, new ArrayList<>());
354    tableCFs.get(tableName2).add("cf1");
355    admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
356    assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap());
357
358    tableCFs.clear();
359    tableCFs.put(tableName4, new ArrayList<>());
360    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
361    admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
362    assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap());
363
364    admin.removeReplicationPeer(ID_ONE);
365  }
366
367  @Test
368  public void testSetPeerNamespaces() throws Exception {
369    String ns1 = "ns1";
370    String ns2 = "ns2";
371
372    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
373    rpc.setClusterKey(KEY_ONE);
374    admin.addReplicationPeer(ID_ONE, rpc).join();
375    rpc.setReplicateAllUserTables(false);
376    admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
377
378    // add ns1 and ns2 to peer config
379    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
380    Set<String> namespaces = new HashSet<>();
381    namespaces.add(ns1);
382    namespaces.add(ns2);
383    rpc.setNamespaces(namespaces);
384    admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
385    namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces();
386    assertEquals(2, namespaces.size());
387    assertTrue(namespaces.contains(ns1));
388    assertTrue(namespaces.contains(ns2));
389
390    // update peer config only contains ns1
391    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
392    namespaces = new HashSet<>();
393    namespaces.add(ns1);
394    rpc.setNamespaces(namespaces);
395    admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
396    namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces();
397    assertEquals(1, namespaces.size());
398    assertTrue(namespaces.contains(ns1));
399
400    admin.removeReplicationPeer(ID_ONE).join();
401  }
402
403  @Test
404  public void testNamespacesAndTableCfsConfigConflict() throws Exception {
405    String ns1 = "ns1";
406    String ns2 = "ns2";
407    final TableName tableName1 = TableName.valueOf(ns1 + ":" + tableName.getNameAsString() + "1");
408    final TableName tableName2 = TableName.valueOf(ns2 + ":" + tableName.getNameAsString() + "2");
409
410    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
411    rpc.setClusterKey(KEY_ONE);
412    admin.addReplicationPeer(ID_ONE, rpc).join();
413    rpc.setReplicateAllUserTables(false);
414    admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
415
416    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
417    Set<String> namespaces = new HashSet<String>();
418    namespaces.add(ns1);
419    rpc.setNamespaces(namespaces);
420    admin.updateReplicationPeerConfig(ID_ONE, rpc).get();
421    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
422    Map<TableName, List<String>> tableCfs = new HashMap<>();
423    tableCfs.put(tableName1, new ArrayList<>());
424    rpc.setTableCFsMap(tableCfs);
425    try {
426      admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
427      fail(
428        "Test case should fail, because table " + tableName1 + " conflict with namespace " + ns1);
429    } catch (CompletionException e) {
430      // OK
431    }
432
433    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
434    tableCfs.clear();
435    tableCfs.put(tableName2, new ArrayList<>());
436    rpc.setTableCFsMap(tableCfs);
437    admin.updateReplicationPeerConfig(ID_ONE, rpc).get();
438    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
439    namespaces.clear();
440    namespaces.add(ns2);
441    rpc.setNamespaces(namespaces);
442    try {
443      admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
444      fail(
445        "Test case should fail, because namespace " + ns2 + " conflict with table " + tableName2);
446    } catch (CompletionException e) {
447      // OK
448    }
449
450    admin.removeReplicationPeer(ID_ONE).join();
451  }
452
453  @Test
454  public void testPeerBandwidth() throws Exception {
455    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
456    rpc.setClusterKey(KEY_ONE);
457
458    admin.addReplicationPeer(ID_ONE, rpc).join();
459    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
460    assertEquals(0, rpc.getBandwidth());
461
462    rpc.setBandwidth(2097152);
463    admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
464    assertEquals(2097152, admin.getReplicationPeerConfig(ID_ONE).join().getBandwidth());
465
466    admin.removeReplicationPeer(ID_ONE).join();
467  }
468
469  @Test
470  public void testInvalidClusterKey() throws InterruptedException {
471    try {
472      admin.addReplicationPeer(ID_ONE,
473        ReplicationPeerConfig.newBuilder().setClusterKey("whatever").build()).get();
474      fail();
475    } catch (ExecutionException e) {
476      assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class));
477    }
478  }
479
480  @Test
481  public void testInvalidReplicationEndpoint() throws InterruptedException {
482    try {
483      admin.addReplicationPeer(ID_ONE,
484        ReplicationPeerConfig.newBuilder().setReplicationEndpointImpl("whatever").build()).get();
485      fail();
486    } catch (ExecutionException e) {
487      assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class));
488      assertThat(e.getCause().getMessage(), startsWith("Can not instantiate"));
489    }
490  }
491
492  @Test
493  public void testSetReplicationEndpoint() throws InterruptedException, ExecutionException {
494    // make sure that we do not need to set cluster key when we use customized ReplicationEndpoint
495    admin
496      .addReplicationPeer(ID_ONE,
497        ReplicationPeerConfig.newBuilder()
498          .setReplicationEndpointImpl(VerifyWALEntriesReplicationEndpoint.class.getName()).build())
499      .get();
500
501    // but we still need to check cluster key if we specify the default ReplicationEndpoint
502    try {
503      admin
504        .addReplicationPeer(ID_TWO, ReplicationPeerConfig.newBuilder()
505          .setReplicationEndpointImpl(HBaseInterClusterReplicationEndpoint.class.getName()).build())
506        .get();
507      fail();
508    } catch (ExecutionException e) {
509      assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class));
510    }
511  }
512}