001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.AsyncConnectionConfiguration.START_LOG_ERRORS_AFTER_COUNT_KEY;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.HashMap;
028import java.util.List;
029import java.util.Map;
030import java.util.concurrent.CompletionException;
031import java.util.concurrent.ForkJoinPool;
032import java.util.regex.Pattern;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseConfiguration;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.TableNotFoundException;
040import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
041import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
042import org.apache.hadoop.hbase.testclassification.ClientTests;
043import org.apache.hadoop.hbase.testclassification.LargeTests;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.junit.After;
046import org.junit.AfterClass;
047import org.junit.BeforeClass;
048import org.junit.ClassRule;
049import org.junit.Test;
050import org.junit.experimental.categories.Category;
051import org.junit.runner.RunWith;
052import org.junit.runners.Parameterized;
053
054/**
055 * Class to test asynchronous replication admin operations when more than 1 cluster
056 */
057@RunWith(Parameterized.class)
058@Category({ LargeTests.class, ClientTests.class })
059public class TestAsyncReplicationAdminApiWithClusters extends TestAsyncAdminBase {
060
061  @ClassRule
062  public static final HBaseClassTestRule CLASS_RULE =
063    HBaseClassTestRule.forClass(TestAsyncReplicationAdminApiWithClusters.class);
064
065  private final static String ID_SECOND = "2";
066
067  private static HBaseTestingUtil TEST_UTIL2;
068  private static Configuration conf2;
069  private static AsyncAdmin admin2;
070  private static AsyncConnection connection;
071
072  @BeforeClass
073  public static void setUpBeforeClass() throws Exception {
074    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
075    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
076    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
077    TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
078    TEST_UTIL.startMiniCluster();
079    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
080
081    conf2 = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
082    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
083    TEST_UTIL2 = new HBaseTestingUtil(conf2);
084    TEST_UTIL2.startMiniCluster();
085
086    connection = ConnectionFactory.createAsyncConnection(TEST_UTIL2.getConfiguration()).get();
087    admin2 = connection.getAdmin();
088
089    ReplicationPeerConfig rpc =
090      ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL2.getRpcConnnectionURI()).build();
091    ASYNC_CONN.getAdmin().addReplicationPeer(ID_SECOND, rpc).join();
092  }
093
094  @AfterClass
095  public static void clearUp() throws IOException {
096    connection.close();
097  }
098
099  @Override
100  @After
101  public void tearDown() throws Exception {
102    Pattern pattern = Pattern.compile(tableName.getNameAsString() + ".*");
103    cleanupTables(admin, pattern);
104    cleanupTables(admin2, pattern);
105  }
106
107  private void cleanupTables(AsyncAdmin admin, Pattern pattern) {
108    admin.listTableNames(pattern, false).whenCompleteAsync((tables, err) -> {
109      if (tables != null) {
110        tables.forEach(table -> {
111          try {
112            admin.disableTable(table).join();
113          } catch (Exception e) {
114            LOG.debug("Table: " + tableName + " already disabled, so just deleting it.");
115          }
116          admin.deleteTable(table).join();
117        });
118      }
119    }, ForkJoinPool.commonPool()).join();
120  }
121
122  private void createTableWithDefaultConf(AsyncAdmin admin, TableName tableName) {
123    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
124    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY));
125    admin.createTable(builder.build()).join();
126  }
127
128  @Test
129  public void testEnableAndDisableTableReplication() throws Exception {
130    // default replication scope is local
131    createTableWithDefaultConf(tableName);
132    admin.enableTableReplication(tableName).join();
133    TableDescriptor tableDesc = admin.getDescriptor(tableName).get();
134    for (ColumnFamilyDescriptor fam : tableDesc.getColumnFamilies()) {
135      assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope());
136    }
137
138    admin.disableTableReplication(tableName).join();
139    tableDesc = admin.getDescriptor(tableName).get();
140    for (ColumnFamilyDescriptor fam : tableDesc.getColumnFamilies()) {
141      assertEquals(HConstants.REPLICATION_SCOPE_LOCAL, fam.getScope());
142    }
143  }
144
145  @Test
146  public void testEnableReplicationWhenSlaveClusterDoesntHaveTable() throws Exception {
147    // Only create table in source cluster
148    createTableWithDefaultConf(tableName);
149    assertFalse(admin2.tableExists(tableName).get());
150    admin.enableTableReplication(tableName).join();
151    assertTrue(admin2.tableExists(tableName).get());
152  }
153
154  @Test
155  public void testEnableReplicationWhenTableDescriptorIsNotSameInClusters() throws Exception {
156    createTableWithDefaultConf(admin, tableName);
157    createTableWithDefaultConf(admin2, tableName);
158    TableDescriptorBuilder builder =
159      TableDescriptorBuilder.newBuilder(admin.getDescriptor(tableName).get());
160    builder.setColumnFamily(
161      ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("newFamily")).build());
162    admin2.disableTable(tableName).join();
163    admin2.modifyTable(builder.build()).join();
164    admin2.enableTable(tableName).join();
165
166    try {
167      admin.enableTableReplication(tableName).join();
168      fail("Exception should be thrown if table descriptors in the clusters are not same.");
169    } catch (Exception ignored) {
170      // ok
171    }
172
173    admin.disableTable(tableName).join();
174    admin.modifyTable(builder.build()).join();
175    admin.enableTable(tableName).join();
176    admin.enableTableReplication(tableName).join();
177    TableDescriptor tableDesc = admin.getDescriptor(tableName).get();
178    for (ColumnFamilyDescriptor fam : tableDesc.getColumnFamilies()) {
179      assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope());
180    }
181  }
182
183  @Test
184  public void testDisableReplicationForNonExistingTable() throws Exception {
185    try {
186      admin.disableTableReplication(tableName).join();
187    } catch (CompletionException e) {
188      assertTrue(e.getCause() instanceof TableNotFoundException);
189    }
190  }
191
192  @Test
193  public void testEnableReplicationForNonExistingTable() throws Exception {
194    try {
195      admin.enableTableReplication(tableName).join();
196    } catch (CompletionException e) {
197      assertTrue(e.getCause() instanceof TableNotFoundException);
198    }
199  }
200
201  @Test
202  public void testDisableReplicationWhenTableNameAsNull() throws Exception {
203    try {
204      admin.disableTableReplication(null).join();
205    } catch (CompletionException e) {
206      assertTrue(e.getCause() instanceof IllegalArgumentException);
207    }
208  }
209
210  @Test
211  public void testEnableReplicationWhenTableNameAsNull() throws Exception {
212    try {
213      admin.enableTableReplication(null).join();
214    } catch (CompletionException e) {
215      assertTrue(e.getCause() instanceof IllegalArgumentException);
216    }
217  }
218
219  /*
220   * Test enable table replication should create table only in user explicit specified table-cfs.
221   * HBASE-14717
222   */
223  @Test
224  public void testEnableReplicationForExplicitSetTableCfs() throws Exception {
225    TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "2");
226    // Only create table in source cluster
227    createTableWithDefaultConf(tableName);
228    createTableWithDefaultConf(tableName2);
229    assertFalse("Table should not exists in the peer cluster", admin2.tableExists(tableName).get());
230    assertFalse("Table should not exists in the peer cluster",
231      admin2.tableExists(tableName2).get());
232
233    Map<TableName, List<String>> tableCfs = new HashMap<>();
234    tableCfs.put(tableName, null);
235    ReplicationPeerConfigBuilder rpcBuilder =
236      ReplicationPeerConfig.newBuilder(admin.getReplicationPeerConfig(ID_SECOND).get())
237        .setReplicateAllUserTables(false).setTableCFsMap(tableCfs);
238    try {
239      // Only add tableName to replication peer config
240      admin.updateReplicationPeerConfig(ID_SECOND, rpcBuilder.build()).join();
241      admin.enableTableReplication(tableName2).join();
242      assertFalse("Table should not be created if user has set table cfs explicitly for the "
243        + "peer and this is not part of that collection", admin2.tableExists(tableName2).get());
244
245      // Add tableName2 to replication peer config, too
246      tableCfs.put(tableName2, null);
247      rpcBuilder.setTableCFsMap(tableCfs);
248      admin.updateReplicationPeerConfig(ID_SECOND, rpcBuilder.build()).join();
249      admin.enableTableReplication(tableName2).join();
250      assertTrue(
251        "Table should be created if user has explicitly added table into table cfs collection",
252        admin2.tableExists(tableName2).get());
253    } finally {
254      rpcBuilder.setTableCFsMap(null).setReplicateAllUserTables(true).build();
255      admin.updateReplicationPeerConfig(ID_SECOND, rpcBuilder.build()).join();
256    }
257  }
258}