001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client.replication;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024
025import java.util.Collection;
026import java.util.HashMap;
027import java.util.Map;
028import java.util.UUID;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HColumnDescriptor;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.HTableDescriptor;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.TableNotFoundException;
035import org.apache.hadoop.hbase.client.Admin;
036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
038import org.apache.hadoop.hbase.client.Connection;
039import org.apache.hadoop.hbase.client.ConnectionFactory;
040import org.apache.hadoop.hbase.client.TableDescriptor;
041import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
042import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
043import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
044import org.apache.hadoop.hbase.replication.TestReplicationBase;
045import org.apache.hadoop.hbase.testclassification.ClientTests;
046import org.apache.hadoop.hbase.testclassification.MediumTests;
047import org.junit.AfterClass;
048import org.junit.BeforeClass;
049import org.junit.ClassRule;
050import org.junit.Rule;
051import org.junit.Test;
052import org.junit.experimental.categories.Category;
053import org.junit.rules.TestName;
054
055/**
056 * Unit testing of ReplicationAdmin with clusters
057 */
058@Category({ MediumTests.class, ClientTests.class })
059public class TestReplicationAdminWithClusters extends TestReplicationBase {
060
061  @ClassRule
062  public static final HBaseClassTestRule CLASS_RULE =
063      HBaseClassTestRule.forClass(TestReplicationAdminWithClusters.class);
064
065  static Connection connection1;
066  static Connection connection2;
067  static Admin admin1;
068  static Admin admin2;
069  static ReplicationAdmin adminExt;
070
071  @Rule
072  public TestName name = new TestName();
073
074  @BeforeClass
075  public static void setUpBeforeClass() throws Exception {
076    TestReplicationBase.setUpBeforeClass();
077    connection1 = ConnectionFactory.createConnection(conf1);
078    connection2 = ConnectionFactory.createConnection(conf2);
079    admin1 = connection1.getAdmin();
080    admin2 = connection2.getAdmin();
081    adminExt = new ReplicationAdmin(conf1);
082  }
083
084  @AfterClass
085  public static void tearDownAfterClass() throws Exception {
086    admin1.close();
087    admin2.close();
088    adminExt.close();
089    connection1.close();
090    connection2.close();
091    TestReplicationBase.tearDownAfterClass();
092  }
093
094  @Test
095  public void disableNotFullReplication() throws Exception {
096    HTableDescriptor table = new HTableDescriptor(admin2.getTableDescriptor(tableName));
097    HColumnDescriptor f = new HColumnDescriptor("notReplicatedFamily");
098    table.addFamily(f);
099    admin1.disableTable(tableName);
100    admin1.modifyTable(tableName, table);
101    admin1.enableTable(tableName);
102
103    admin1.disableTableReplication(tableName);
104    table = admin1.getTableDescriptor(tableName);
105    for (HColumnDescriptor fam : table.getColumnFamilies()) {
106      assertEquals(HConstants.REPLICATION_SCOPE_LOCAL, fam.getScope());
107    }
108
109    admin1.deleteColumnFamily(table.getTableName(), f.getName());
110  }
111
112  @Test
113  public void testEnableReplicationWhenSlaveClusterDoesntHaveTable() throws Exception {
114    admin1.disableTableReplication(tableName);
115    admin2.disableTable(tableName);
116    admin2.deleteTable(tableName);
117    assertFalse(admin2.tableExists(tableName));
118    admin1.enableTableReplication(tableName);
119    assertTrue(admin2.tableExists(tableName));
120  }
121
122  @Test
123  public void testEnableReplicationWhenReplicationNotEnabled() throws Exception {
124    HTableDescriptor table = new HTableDescriptor(admin1.getTableDescriptor(tableName));
125    for (HColumnDescriptor fam : table.getColumnFamilies()) {
126      fam.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
127    }
128    admin1.disableTable(tableName);
129    admin1.modifyTable(tableName, table);
130    admin1.enableTable(tableName);
131
132    admin2.disableTable(tableName);
133    admin2.modifyTable(tableName, table);
134    admin2.enableTable(tableName);
135
136    admin1.enableTableReplication(tableName);
137    table = admin1.getTableDescriptor(tableName);
138    for (HColumnDescriptor fam : table.getColumnFamilies()) {
139      assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope());
140    }
141  }
142
143  @Test
144  public void testEnableReplicationWhenTableDescriptorIsNotSameInClusters() throws Exception {
145    HTableDescriptor table = new HTableDescriptor(admin2.getTableDescriptor(tableName));
146    HColumnDescriptor f = new HColumnDescriptor("newFamily");
147    table.addFamily(f);
148    admin2.disableTable(tableName);
149    admin2.modifyTable(tableName, table);
150    admin2.enableTable(tableName);
151
152    try {
153      admin1.enableTableReplication(tableName);
154      fail("Exception should be thrown if table descriptors in the clusters are not same.");
155    } catch (RuntimeException ignored) {
156
157    }
158    admin1.disableTable(tableName);
159    admin1.modifyTable(tableName, table);
160    admin1.enableTable(tableName);
161    admin1.enableTableReplication(tableName);
162    table = admin1.getTableDescriptor(tableName);
163    for (HColumnDescriptor fam : table.getColumnFamilies()) {
164      assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope());
165    }
166
167    admin1.deleteColumnFamily(tableName, f.getName());
168    admin2.deleteColumnFamily(tableName, f.getName());
169  }
170
171  @Test
172  public void testDisableAndEnableReplication() throws Exception {
173    admin1.disableTableReplication(tableName);
174    HTableDescriptor table = admin1.getTableDescriptor(tableName);
175    for (HColumnDescriptor fam : table.getColumnFamilies()) {
176      assertEquals(HConstants.REPLICATION_SCOPE_LOCAL, fam.getScope());
177    }
178    admin1.enableTableReplication(tableName);
179    table = admin1.getTableDescriptor(tableName);
180    for (HColumnDescriptor fam : table.getColumnFamilies()) {
181      assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope());
182    }
183  }
184
185  @Test
186  public void testEnableReplicationForTableWithRegionReplica() throws Exception {
187    TableName tn = TableName.valueOf(name.getMethodName());
188    TableDescriptor td = TableDescriptorBuilder.newBuilder(tn)
189        .setRegionReplication(5)
190        .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(noRepfamName).build())
191        .build();
192
193    admin1.createTable(td);
194
195    try {
196      admin1.enableTableReplication(tn);
197      td = admin1.getDescriptor(tn);
198      for (ColumnFamilyDescriptor fam : td.getColumnFamilies()) {
199        assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope());
200      }
201    } finally {
202      utility1.deleteTable(tn);
203      utility2.deleteTable(tn);
204    }
205  }
206
207  @Test(expected = TableNotFoundException.class)
208  public void testDisableReplicationForNonExistingTable() throws Exception {
209    admin1.disableTableReplication(TableName.valueOf(name.getMethodName()));
210  }
211
212  @Test(expected = TableNotFoundException.class)
213  public void testEnableReplicationForNonExistingTable() throws Exception {
214    admin1.enableTableReplication(TableName.valueOf(name.getMethodName()));
215  }
216
217  @Test(expected = IllegalArgumentException.class)
218  public void testDisableReplicationWhenTableNameAsNull() throws Exception {
219    admin1.disableTableReplication(null);
220  }
221
222  @Test(expected = IllegalArgumentException.class)
223  public void testEnableReplicationWhenTableNameAsNull() throws Exception {
224    admin1.enableTableReplication(null);
225  }
226
227  /*
228   * Test enable table replication should create table only in user explicit specified table-cfs.
229   * HBASE-14717
230   */
231  @Test
232  public void testEnableReplicationForExplicitSetTableCfs() throws Exception {
233    final TableName tableName = TableName.valueOf(name.getMethodName());
234    String peerId = "2";
235    if (admin2.isTableAvailable(TestReplicationBase.tableName)) {
236      admin2.disableTable(TestReplicationBase.tableName);
237      admin2.deleteTable(TestReplicationBase.tableName);
238    }
239    assertFalse("Table should not exists in the peer cluster",
240      admin2.isTableAvailable(TestReplicationBase.tableName));
241
242    // update peer config
243    ReplicationPeerConfig rpc = admin1.getReplicationPeerConfig(peerId);
244    rpc.setReplicateAllUserTables(false);
245    admin1.updateReplicationPeerConfig(peerId, rpc);
246
247    Map<TableName, ? extends Collection<String>> tableCfs = new HashMap<>();
248    tableCfs.put(tableName, null);
249    try {
250      adminExt.setPeerTableCFs(peerId, tableCfs);
251      admin1.enableTableReplication(TestReplicationBase.tableName);
252      assertFalse("Table should not be created if user has set table cfs explicitly for the "
253          + "peer and this is not part of that collection",
254        admin2.isTableAvailable(TestReplicationBase.tableName));
255
256      tableCfs.put(TestReplicationBase.tableName, null);
257      adminExt.setPeerTableCFs(peerId, tableCfs);
258      admin1.enableTableReplication(TestReplicationBase.tableName);
259      assertTrue(
260        "Table should be created if user has explicitly added table into table cfs collection",
261        admin2.isTableAvailable(TestReplicationBase.tableName));
262    } finally {
263      adminExt.removePeerTableCFs(peerId, adminExt.getPeerTableCFs(peerId));
264      admin1.disableTableReplication(TestReplicationBase.tableName);
265
266      rpc = admin1.getReplicationPeerConfig(peerId);
267      rpc.setReplicateAllUserTables(true);
268      admin1.updateReplicationPeerConfig(peerId, rpc);
269    }
270  }
271
272  @Test
273  public void testReplicationPeerConfigUpdateCallback() throws Exception {
274    String peerId = "1";
275    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
276    rpc.setClusterKey(utility2.getClusterKey());
277    rpc.setReplicationEndpointImpl(TestUpdatableReplicationEndpoint.class.getName());
278    rpc.getConfiguration().put("key1", "value1");
279
280    admin1.addReplicationPeer(peerId, rpc);
281
282    rpc.getConfiguration().put("key1", "value2");
283    admin.updatePeerConfig(peerId, rpc);
284    if (!TestUpdatableReplicationEndpoint.hasCalledBack()) {
285      synchronized (TestUpdatableReplicationEndpoint.class) {
286        TestUpdatableReplicationEndpoint.class.wait(2000L);
287      }
288    }
289
290    assertEquals(true, TestUpdatableReplicationEndpoint.hasCalledBack());
291
292    admin.removePeer(peerId);
293  }
294
295  public static class TestUpdatableReplicationEndpoint extends BaseReplicationEndpoint {
296    private static boolean calledBack = false;
297    public static boolean hasCalledBack(){
298      return calledBack;
299    }
300    @Override
301    public synchronized void peerConfigUpdated(ReplicationPeerConfig rpc){
302      calledBack = true;
303      notifyAll();
304    }
305
306    @Override
307    public void start() {
308      startAsync();
309    }
310
311    @Override
312    public void stop() {
313      stopAsync();
314    }
315
316    @Override
317    protected void doStart() {
318      notifyStarted();
319    }
320
321    @Override
322    protected void doStop() {
323      notifyStopped();
324    }
325
326    @Override
327    public UUID getPeerUUID() {
328      return utility1.getRandomUUID();
329    }
330
331    @Override
332    public boolean replicate(ReplicateContext replicateContext) {
333      return false;
334    }
335  }
336}