001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client.replication;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024
025import java.util.Collection;
026import java.util.HashMap;
027import java.util.Map;
028import java.util.UUID;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HColumnDescriptor;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.HTableDescriptor;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.TableNotFoundException;
035import org.apache.hadoop.hbase.client.Admin;
036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
038import org.apache.hadoop.hbase.client.Connection;
039import org.apache.hadoop.hbase.client.ConnectionFactory;
040import org.apache.hadoop.hbase.client.TableDescriptor;
041import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
042import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
043import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
044import org.apache.hadoop.hbase.replication.TestReplicationBase;
045import org.apache.hadoop.hbase.testclassification.ClientTests;
046import org.apache.hadoop.hbase.testclassification.MediumTests;
047import org.junit.AfterClass;
048import org.junit.BeforeClass;
049import org.junit.ClassRule;
050import org.junit.Rule;
051import org.junit.Test;
052import org.junit.experimental.categories.Category;
053import org.junit.rules.TestName;
054
055/**
056 * Unit testing of ReplicationAdmin with clusters
057 */
058@Category({ MediumTests.class, ClientTests.class })
059public class TestReplicationAdminWithClusters extends TestReplicationBase {
060
061  @ClassRule
062  public static final HBaseClassTestRule CLASS_RULE =
063    HBaseClassTestRule.forClass(TestReplicationAdminWithClusters.class);
064
065  static Connection connection1;
066  static Connection connection2;
067  static Admin admin1;
068  static Admin admin2;
069  static ReplicationAdmin adminExt;
070
071  @Rule
072  public TestName name = new TestName();
073
074  @BeforeClass
075  public static void setUpBeforeClass() throws Exception {
076    TestReplicationBase.setUpBeforeClass();
077    connection1 = ConnectionFactory.createConnection(CONF1);
078    connection2 = ConnectionFactory.createConnection(CONF2);
079    admin1 = connection1.getAdmin();
080    admin2 = connection2.getAdmin();
081    adminExt = new ReplicationAdmin(CONF1);
082  }
083
084  @AfterClass
085  public static void tearDownAfterClass() throws Exception {
086    admin1.close();
087    admin2.close();
088    adminExt.close();
089    connection1.close();
090    connection2.close();
091    TestReplicationBase.tearDownAfterClass();
092  }
093
094  @Test
095  public void disableNotFullReplication() throws Exception {
096    HTableDescriptor table = new HTableDescriptor(admin2.getTableDescriptor(tableName));
097    HColumnDescriptor f = new HColumnDescriptor("notReplicatedFamily");
098    table.addFamily(f);
099    admin1.disableTable(tableName);
100    admin1.modifyTable(tableName, table);
101    admin1.enableTable(tableName);
102
103    admin1.disableTableReplication(tableName);
104    table = admin1.getTableDescriptor(tableName);
105    for (HColumnDescriptor fam : table.getColumnFamilies()) {
106      assertEquals(HConstants.REPLICATION_SCOPE_LOCAL, fam.getScope());
107    }
108
109    admin1.deleteColumnFamily(table.getTableName(), f.getName());
110  }
111
112  @Test
113  public void testEnableReplicationWhenSlaveClusterDoesntHaveTable() throws Exception {
114    admin1.disableTableReplication(tableName);
115    admin2.disableTable(tableName);
116    admin2.deleteTable(tableName);
117    assertFalse(admin2.tableExists(tableName));
118    admin1.enableTableReplication(tableName);
119    assertTrue(admin2.tableExists(tableName));
120  }
121
122  @Test
123  public void testEnableReplicationWhenReplicationNotEnabled() throws Exception {
124    HTableDescriptor table = new HTableDescriptor(admin1.getTableDescriptor(tableName));
125    for (HColumnDescriptor fam : table.getColumnFamilies()) {
126      fam.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
127    }
128    admin1.disableTable(tableName);
129    admin1.modifyTable(tableName, table);
130    admin1.enableTable(tableName);
131
132    admin2.disableTable(tableName);
133    admin2.modifyTable(tableName, table);
134    admin2.enableTable(tableName);
135
136    admin1.enableTableReplication(tableName);
137    table = admin1.getTableDescriptor(tableName);
138    for (HColumnDescriptor fam : table.getColumnFamilies()) {
139      assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope());
140    }
141  }
142
143  @Test
144  public void testEnableReplicationWhenTableDescriptorIsNotSameInClusters() throws Exception {
145    HTableDescriptor table = new HTableDescriptor(admin2.getTableDescriptor(tableName));
146    HColumnDescriptor f = new HColumnDescriptor("newFamily");
147    table.addFamily(f);
148    admin2.disableTable(tableName);
149    admin2.modifyTable(tableName, table);
150    admin2.enableTable(tableName);
151
152    try {
153      admin1.enableTableReplication(tableName);
154      fail("Exception should be thrown if table descriptors in the clusters are not same.");
155    } catch (RuntimeException ignored) {
156
157    }
158    admin1.disableTable(tableName);
159    admin1.modifyTable(tableName, table);
160    admin1.enableTable(tableName);
161    admin1.enableTableReplication(tableName);
162    table = admin1.getTableDescriptor(tableName);
163    for (HColumnDescriptor fam : table.getColumnFamilies()) {
164      assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope());
165    }
166
167    admin1.deleteColumnFamily(tableName, f.getName());
168    admin2.deleteColumnFamily(tableName, f.getName());
169  }
170
171  @Test
172  public void testDisableAndEnableReplication() throws Exception {
173    admin1.disableTableReplication(tableName);
174    HTableDescriptor table = admin1.getTableDescriptor(tableName);
175    for (HColumnDescriptor fam : table.getColumnFamilies()) {
176      assertEquals(HConstants.REPLICATION_SCOPE_LOCAL, fam.getScope());
177    }
178    admin1.enableTableReplication(tableName);
179    table = admin1.getTableDescriptor(tableName);
180    for (HColumnDescriptor fam : table.getColumnFamilies()) {
181      assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope());
182    }
183  }
184
185  @Test
186  public void testEnableReplicationForTableWithRegionReplica() throws Exception {
187    TableName tn = TableName.valueOf(name.getMethodName());
188    TableDescriptor td = TableDescriptorBuilder.newBuilder(tn).setRegionReplication(5)
189      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(noRepfamName).build()).build();
190
191    admin1.createTable(td);
192
193    try {
194      admin1.enableTableReplication(tn);
195      td = admin1.getDescriptor(tn);
196      for (ColumnFamilyDescriptor fam : td.getColumnFamilies()) {
197        assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope());
198      }
199    } finally {
200      UTIL1.deleteTable(tn);
201      UTIL2.deleteTable(tn);
202    }
203  }
204
205  @Test(expected = TableNotFoundException.class)
206  public void testDisableReplicationForNonExistingTable() throws Exception {
207    admin1.disableTableReplication(TableName.valueOf(name.getMethodName()));
208  }
209
210  @Test(expected = TableNotFoundException.class)
211  public void testEnableReplicationForNonExistingTable() throws Exception {
212    admin1.enableTableReplication(TableName.valueOf(name.getMethodName()));
213  }
214
215  @Test(expected = IllegalArgumentException.class)
216  public void testDisableReplicationWhenTableNameAsNull() throws Exception {
217    admin1.disableTableReplication(null);
218  }
219
220  @Test(expected = IllegalArgumentException.class)
221  public void testEnableReplicationWhenTableNameAsNull() throws Exception {
222    admin1.enableTableReplication(null);
223  }
224
225  /*
226   * Test enable table replication should create table only in user explicit specified table-cfs.
227   * HBASE-14717
228   */
229  @Test
230  public void testEnableReplicationForExplicitSetTableCfs() throws Exception {
231    final TableName tableName = TableName.valueOf(name.getMethodName());
232    String peerId = "2";
233    if (admin2.isTableAvailable(TestReplicationBase.tableName)) {
234      admin2.disableTable(TestReplicationBase.tableName);
235      admin2.deleteTable(TestReplicationBase.tableName);
236    }
237    assertFalse("Table should not exists in the peer cluster",
238      admin2.isTableAvailable(TestReplicationBase.tableName));
239
240    // update peer config
241    ReplicationPeerConfig rpc = admin1.getReplicationPeerConfig(peerId);
242    rpc.setReplicateAllUserTables(false);
243    admin1.updateReplicationPeerConfig(peerId, rpc);
244
245    Map<TableName, ? extends Collection<String>> tableCfs = new HashMap<>();
246    tableCfs.put(tableName, null);
247    try {
248      adminExt.setPeerTableCFs(peerId, tableCfs);
249      admin1.enableTableReplication(TestReplicationBase.tableName);
250      assertFalse(
251        "Table should not be created if user has set table cfs explicitly for the "
252          + "peer and this is not part of that collection",
253        admin2.isTableAvailable(TestReplicationBase.tableName));
254
255      tableCfs.put(TestReplicationBase.tableName, null);
256      adminExt.setPeerTableCFs(peerId, tableCfs);
257      admin1.enableTableReplication(TestReplicationBase.tableName);
258      assertTrue(
259        "Table should be created if user has explicitly added table into table cfs collection",
260        admin2.isTableAvailable(TestReplicationBase.tableName));
261    } finally {
262      adminExt.removePeerTableCFs(peerId, adminExt.getPeerTableCFs(peerId));
263      admin1.disableTableReplication(TestReplicationBase.tableName);
264
265      rpc = admin1.getReplicationPeerConfig(peerId);
266      rpc.setReplicateAllUserTables(true);
267      admin1.updateReplicationPeerConfig(peerId, rpc);
268    }
269  }
270
271  @Test
272  public void testReplicationPeerConfigUpdateCallback() throws Exception {
273    String peerId = "1";
274    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
275    rpc.setClusterKey(UTIL2.getClusterKey());
276    rpc.setReplicationEndpointImpl(TestUpdatableReplicationEndpoint.class.getName());
277    rpc.getConfiguration().put("key1", "value1");
278
279    admin1.addReplicationPeer(peerId, rpc);
280
281    rpc.getConfiguration().put("key1", "value2");
282    admin.updatePeerConfig(peerId, rpc);
283    if (!TestUpdatableReplicationEndpoint.hasCalledBack()) {
284      synchronized (TestUpdatableReplicationEndpoint.class) {
285        TestUpdatableReplicationEndpoint.class.wait(2000L);
286      }
287    }
288
289    assertEquals(true, TestUpdatableReplicationEndpoint.hasCalledBack());
290
291    admin.removePeer(peerId);
292  }
293
294  public static class TestUpdatableReplicationEndpoint extends BaseReplicationEndpoint {
295    private static boolean calledBack = false;
296
297    public static boolean hasCalledBack() {
298      return calledBack;
299    }
300
301    @Override
302    public synchronized void peerConfigUpdated(ReplicationPeerConfig rpc) {
303      calledBack = true;
304      notifyAll();
305    }
306
307    @Override
308    public void start() {
309      startAsync();
310    }
311
312    @Override
313    public void stop() {
314      stopAsync();
315    }
316
317    @Override
318    protected void doStart() {
319      notifyStarted();
320    }
321
322    @Override
323    protected void doStop() {
324      notifyStopped();
325    }
326
327    @Override
328    public UUID getPeerUUID() {
329      return UTIL1.getRandomUUID();
330    }
331
332    @Override
333    public boolean replicate(ReplicateContext replicateContext) {
334      return false;
335    }
336  }
337}