001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.Set;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.atomic.AtomicBoolean;
027
028import org.apache.commons.lang3.RandomStringUtils;
029import org.apache.commons.lang3.RandomUtils;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.client.Admin;
032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
034import org.apache.hadoop.hbase.client.Connection;
035import org.apache.hadoop.hbase.client.ConnectionFactory;
036import org.apache.hadoop.hbase.client.Put;
037import org.apache.hadoop.hbase.client.Table;
038import org.apache.hadoop.hbase.client.TableDescriptor;
039import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
040import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
041import org.apache.hadoop.hbase.log.HBaseMarkers;
042import org.apache.hadoop.hbase.testclassification.IntegrationTests;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.apache.hadoop.hbase.util.HBaseFsck;
045import org.apache.hadoop.hbase.util.Threads;
046import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil;
047import org.apache.hadoop.util.ToolRunner;
048import org.junit.Assert;
049import org.junit.Test;
050import org.junit.experimental.categories.Category;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054/**
055 *
056 * Integration test that verifies Procedure V2.
057 *
058 * DDL operations should go through (rollforward or rollback) when primary master is killed by
059 * ChaosMonkey (default MASTER_KILLING).
060 *
061 * <p></p>Multiple Worker threads are started to randomly do the following Actions in loops:
062 * Actions generating and populating tables:
063 * <ul>
064 *     <li>CreateTableAction</li>
065 *     <li>DisableTableAction</li>
066 *     <li>EnableTableAction</li>
067 *     <li>DeleteTableAction</li>
068 *     <li>AddRowAction</li>
069 * </ul>
070 * Actions performing column family DDL operations:
071 * <ul>
072 *     <li>AddColumnFamilyAction</li>
073 *     <li>AlterColumnFamilyVersionsAction</li>
074 *     <li>AlterColumnFamilyEncodingAction</li>
075 *     <li>DeleteColumnFamilyAction</li>
076 * </ul>
077 * Actions performing namespace DDL operations:
078 * <ul>
079 *     <li>AddNamespaceAction</li>
080 *     <li>AlterNamespaceAction</li>
081 *     <li>DeleteNamespaceAction</li>
082 * </ul>
083 * <br/>
084 *
085 * The threads run for a period of time (default 20 minutes) then are stopped at the end of
086 * runtime. Verification is performed towards those checkpoints:
087 * <ol>
088 *     <li>No Actions throw Exceptions.</li>
089 *     <li>No inconsistencies are detected in hbck.</li>
090 * </ol>
091 *
092 * <p>
093 * This test should be run by the hbase user since it invokes hbck at the end
094 * </p><p>
095 * Usage:
096 *  hbase org.apache.hadoop.hbase.IntegrationTestDDLMasterFailover
097 *    -Dhbase.IntegrationTestDDLMasterFailover.runtime=1200000
098 *    -Dhbase.IntegrationTestDDLMasterFailover.numThreads=20
099 *    -Dhbase.IntegrationTestDDLMasterFailover.numRegions=50 --monkey masterKilling
100 */
101
102@Category(IntegrationTests.class)
103public class IntegrationTestDDLMasterFailover extends IntegrationTestBase {
104
105  private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestDDLMasterFailover.class);
106
107  private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster
108
109  protected static final long DEFAULT_RUN_TIME = 20 * 60 * 1000;
110
111  protected static final int DEFAULT_NUM_THREADS = 20;
112
113  protected static final int DEFAULT_NUM_REGIONS = 50; // number of regions in pre-split tables
114
115  private boolean keepObjectsAtTheEnd = false;
116  protected HBaseCluster cluster;
117
118  protected Connection connection;
119
120  /**
121   * A soft limit on how long we should run
122   */
123  protected static final String RUN_TIME_KEY = "hbase.%s.runtime";
124  protected static final String NUM_THREADS_KEY = "hbase.%s.numThreads";
125  protected static final String NUM_REGIONS_KEY = "hbase.%s.numRegions";
126
127  protected AtomicBoolean running = new AtomicBoolean(true);
128
129  protected AtomicBoolean create_table = new AtomicBoolean(true);
130
131  protected int numThreads, numRegions;
132
133  ConcurrentHashMap<String, NamespaceDescriptor> namespaceMap = new ConcurrentHashMap<>();
134
135  ConcurrentHashMap<TableName, TableDescriptor> enabledTables = new ConcurrentHashMap<>();
136
137  ConcurrentHashMap<TableName, TableDescriptor> disabledTables = new ConcurrentHashMap<>();
138
139  ConcurrentHashMap<TableName, TableDescriptor> deletedTables = new ConcurrentHashMap<>();
140
141  @Override
142  public void setUpCluster() throws Exception {
143    util = getTestingUtil(getConf());
144    LOG.debug("Initializing/checking cluster has " + SERVER_COUNT + " servers");
145    util.initializeCluster(getMinServerCount());
146    LOG.debug("Done initializing/checking cluster");
147    cluster = util.getHBaseClusterInterface();
148  }
149
150  @Override
151  public void cleanUpCluster() throws Exception {
152    if (!keepObjectsAtTheEnd) {
153      Admin admin = util.getAdmin();
154      admin.disableTables("ittable-\\d+");
155      admin.deleteTables("ittable-\\d+");
156      NamespaceDescriptor [] nsds = admin.listNamespaceDescriptors();
157      for(NamespaceDescriptor nsd: nsds) {
158        if(nsd.getName().matches("itnamespace\\d+")) {
159          LOG.info("Removing namespace="+nsd.getName());
160          admin.deleteNamespace(nsd.getName());
161        }
162      }
163    }
164
165    enabledTables.clear();
166    disabledTables.clear();
167    deletedTables.clear();
168    namespaceMap.clear();
169
170    Connection connection = getConnection();
171    connection.close();
172    super.cleanUpCluster();
173  }
174
175  protected int getMinServerCount() {
176    return SERVER_COUNT;
177  }
178
179  protected synchronized void setConnection(Connection connection){
180    this.connection = connection;
181  }
182
183  protected synchronized Connection getConnection(){
184    if (this.connection == null) {
185      try {
186        Connection connection = ConnectionFactory.createConnection(getConf());
187        setConnection(connection);
188      } catch (IOException e) {
189        LOG.error(HBaseMarkers.FATAL, "Failed to establish connection.", e);
190      }
191    }
192    return connection;
193  }
194
195  protected void verifyNamespaces() throws  IOException{
196    Connection connection = getConnection();
197    Admin admin = connection.getAdmin();
198    // iterating concurrent map
199    for (String nsName : namespaceMap.keySet()){
200      try {
201        Assert.assertTrue(
202          "Namespace: " + nsName + " in namespaceMap does not exist",
203          admin.getNamespaceDescriptor(nsName) != null);
204      } catch (NamespaceNotFoundException nsnfe) {
205        Assert.fail(
206          "Namespace: " + nsName + " in namespaceMap does not exist: " + nsnfe.getMessage());
207      }
208    }
209    admin.close();
210  }
211
212  protected void verifyTables() throws  IOException{
213    Connection connection = getConnection();
214    Admin admin = connection.getAdmin();
215    // iterating concurrent map
216    for (TableName tableName : enabledTables.keySet()){
217      Assert.assertTrue("Table: " + tableName + " in enabledTables is not enabled",
218          admin.isTableEnabled(tableName));
219    }
220    for (TableName tableName : disabledTables.keySet()){
221      Assert.assertTrue("Table: " + tableName + " in disabledTables is not disabled",
222          admin.isTableDisabled(tableName));
223    }
224    for (TableName tableName : deletedTables.keySet()){
225      Assert.assertFalse("Table: " + tableName + " in deletedTables is not deleted",
226          admin.tableExists(tableName));
227    }
228    admin.close();
229  }
230
231  @Test
232  public void testAsUnitTest() throws Exception {
233    runTest();
234  }
235
236  @Override
237  public int runTestFromCommandLine() throws Exception {
238    int ret = runTest();
239    return ret;
240  }
241
242  private abstract class MasterAction{
243    Connection connection = getConnection();
244
245    abstract void perform() throws IOException;
246  }
247
248  private abstract class NamespaceAction extends MasterAction {
249    final String nsTestConfigKey = "hbase.namespace.testKey";
250
251    // NamespaceAction has implemented selectNamespace() shared by multiple namespace Actions
252    protected NamespaceDescriptor selectNamespace(
253        ConcurrentHashMap<String, NamespaceDescriptor> namespaceMap) {
254      // synchronization to prevent removal from multiple threads
255      synchronized (namespaceMap) {
256        // randomly select namespace from namespaceMap
257        if (namespaceMap.isEmpty()) {
258          return null;
259        }
260        ArrayList<String> namespaceList = new ArrayList<>(namespaceMap.keySet());
261        String randomKey = namespaceList.get(RandomUtils.nextInt(0, namespaceList.size()));
262        NamespaceDescriptor randomNsd = namespaceMap.get(randomKey);
263        // remove from namespaceMap
264        namespaceMap.remove(randomKey);
265        return randomNsd;
266      }
267    }
268  }
269
270  private class CreateNamespaceAction extends NamespaceAction {
271    @Override
272    void perform() throws IOException {
273      Admin admin = connection.getAdmin();
274      try {
275        NamespaceDescriptor nsd;
276        while (true) {
277          nsd = createNamespaceDesc();
278          try {
279            if (admin.getNamespaceDescriptor(nsd.getName()) != null) {
280              // the namespace has already existed.
281              continue;
282            } else {
283              // currently, the code never return null - always throws exception if
284              // namespace is not found - this just a defensive programming to make
285              // sure null situation is handled in case the method changes in the
286              // future.
287              break;
288            }
289          } catch (NamespaceNotFoundException nsnfe) {
290            // This is expected for a random generated NamespaceDescriptor
291            break;
292          }
293        }
294        LOG.info("Creating namespace:" + nsd);
295        admin.createNamespace(nsd);
296        NamespaceDescriptor freshNamespaceDesc = admin.getNamespaceDescriptor(nsd.getName());
297        Assert.assertTrue("Namespace: " + nsd + " was not created", freshNamespaceDesc != null);
298        namespaceMap.put(nsd.getName(), freshNamespaceDesc);
299        LOG.info("Created namespace:" + freshNamespaceDesc);
300      } catch (Exception e){
301        LOG.warn("Caught exception in action: " + this.getClass());
302        throw e;
303      } finally {
304        admin.close();
305      }
306    }
307
308    private NamespaceDescriptor createNamespaceDesc() {
309      String namespaceName = "itnamespace" + String.format("%010d",
310        RandomUtils.nextInt());
311      NamespaceDescriptor nsd = NamespaceDescriptor.create(namespaceName).build();
312
313      nsd.setConfiguration(
314        nsTestConfigKey,
315        String.format("%010d", RandomUtils.nextInt()));
316      return nsd;
317    }
318  }
319
320  private class ModifyNamespaceAction extends NamespaceAction {
321    @Override
322    void perform() throws IOException {
323      NamespaceDescriptor selected = selectNamespace(namespaceMap);
324      if (selected == null) {
325        return;
326      }
327
328      Admin admin = connection.getAdmin();
329      try {
330        String namespaceName = selected.getName();
331        LOG.info("Modifying namespace :" + selected);
332        NamespaceDescriptor modifiedNsd = NamespaceDescriptor.create(namespaceName).build();
333        String nsValueNew;
334        do {
335          nsValueNew = String.format("%010d", RandomUtils.nextInt());
336        } while (selected.getConfigurationValue(nsTestConfigKey).equals(nsValueNew));
337        modifiedNsd.setConfiguration(nsTestConfigKey, nsValueNew);
338        admin.modifyNamespace(modifiedNsd);
339        NamespaceDescriptor freshNamespaceDesc = admin.getNamespaceDescriptor(namespaceName);
340        Assert.assertTrue(
341          "Namespace: " + selected + " was not modified",
342          freshNamespaceDesc.getConfigurationValue(nsTestConfigKey).equals(nsValueNew));
343        Assert.assertTrue(
344          "Namespace: " + namespaceName + " does not exist",
345          admin.getNamespaceDescriptor(namespaceName) != null);
346        namespaceMap.put(namespaceName, freshNamespaceDesc);
347        LOG.info("Modified namespace :" + freshNamespaceDesc);
348      } catch (Exception e){
349        LOG.warn("Caught exception in action: " + this.getClass());
350        throw e;
351      } finally {
352        admin.close();
353      }
354    }
355  }
356
357  private class DeleteNamespaceAction extends NamespaceAction {
358    @Override
359    void perform() throws IOException {
360      NamespaceDescriptor selected = selectNamespace(namespaceMap);
361      if (selected == null) {
362        return;
363      }
364
365      Admin admin = connection.getAdmin();
366      try {
367        String namespaceName = selected.getName();
368        LOG.info("Deleting namespace :" + selected);
369        admin.deleteNamespace(namespaceName);
370        try {
371          if (admin.getNamespaceDescriptor(namespaceName) != null) {
372            // the namespace still exists.
373            Assert.assertTrue("Namespace: " + selected + " was not deleted", false);
374          } else {
375            LOG.info("Deleted namespace :" + selected);
376          }
377        } catch (NamespaceNotFoundException nsnfe) {
378          // This is expected result
379          LOG.info("Deleted namespace :" + selected);
380        }
381      } catch (Exception e){
382        LOG.warn("Caught exception in action: " + this.getClass());
383        throw e;
384      } finally {
385        admin.close();
386      }
387    }
388  }
389
390  private abstract class TableAction extends  MasterAction{
391    // TableAction has implemented selectTable() shared by multiple table Actions
392    protected TableDescriptor selectTable(ConcurrentHashMap<TableName, TableDescriptor> tableMap)
393    {
394      // synchronization to prevent removal from multiple threads
395      synchronized (tableMap){
396        // randomly select table from tableMap
397        if (tableMap.isEmpty()) {
398          return null;
399        }
400        ArrayList<TableName> tableList = new ArrayList<>(tableMap.keySet());
401        TableName randomKey = tableList.get(RandomUtils.nextInt(0, tableList.size()));
402        TableDescriptor randomTd = tableMap.remove(randomKey);
403        return randomTd;
404      }
405    }
406  }
407
408  private class CreateTableAction extends TableAction {
409
410    @Override
411    void perform() throws IOException {
412      Admin admin = connection.getAdmin();
413      try {
414        TableDescriptor td = createTableDesc();
415        TableName tableName = td.getTableName();
416        if ( admin.tableExists(tableName)){
417          return;
418        }
419        String numRegionKey = String.format(NUM_REGIONS_KEY, this.getClass().getSimpleName());
420        numRegions = getConf().getInt(numRegionKey, DEFAULT_NUM_REGIONS);
421        byte[] startKey = Bytes.toBytes("row-0000000000");
422        byte[] endKey = Bytes.toBytes("row-" + Integer.MAX_VALUE);
423        LOG.info("Creating table:" + td);
424        admin.createTable(td, startKey, endKey, numRegions);
425        Assert.assertTrue("Table: " + td + " was not created", admin.tableExists(tableName));
426        TableDescriptor freshTableDesc = admin.getDescriptor(tableName);
427        Assert.assertTrue(
428          "After create, Table: " + tableName + " in not enabled", admin.isTableEnabled(tableName));
429        enabledTables.put(tableName, freshTableDesc);
430        LOG.info("Created table:" + freshTableDesc);
431      } catch (Exception e) {
432        LOG.warn("Caught exception in action: " + this.getClass());
433        throw e;
434      } finally {
435        admin.close();
436      }
437    }
438
439    private TableDescriptor createTableDesc() {
440      String tableName = String.format("ittable-%010d", RandomUtils.nextInt());
441      String familyName = "cf-" + Math.abs(RandomUtils.nextInt());
442      return TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))
443          .setColumnFamily(ColumnFamilyDescriptorBuilder.of(familyName))
444          .build();
445    }
446  }
447
448  private class DisableTableAction extends TableAction {
449
450    @Override
451    void perform() throws IOException {
452
453      TableDescriptor selected = selectTable(enabledTables);
454      if (selected == null) {
455        return;
456      }
457
458      Admin admin = connection.getAdmin();
459      try {
460        TableName tableName = selected.getTableName();
461        LOG.info("Disabling table :" + selected);
462        admin.disableTable(tableName);
463        Assert.assertTrue("Table: " + selected + " was not disabled",
464            admin.isTableDisabled(tableName));
465        TableDescriptor freshTableDesc = admin.getDescriptor(tableName);
466        Assert.assertTrue(
467          "After disable, Table: " + tableName + " is not disabled",
468          admin.isTableDisabled(tableName));
469        disabledTables.put(tableName, freshTableDesc);
470        LOG.info("Disabled table :" + freshTableDesc);
471      } catch (Exception e){
472        LOG.warn("Caught exception in action: " + this.getClass());
473        // TODO workaround
474        // loose restriction for TableNotDisabledException/TableNotEnabledException thrown in sync
475        // operations
476        // 1) when enable/disable starts, the table state is changed to ENABLING/DISABLING (ZK node
477        // in 1.x), which will be further changed to ENABLED/DISABLED once the operation completes
478        // 2) if master failover happens in the middle of the enable/disable operation, the new
479        // master will try to recover the tables in ENABLING/DISABLING state, as programmed in
480        // AssignmentManager#recoverTableInEnablingState() and
481        // AssignmentManager#recoverTableInDisablingState()
482        // 3) after the new master initialization completes, the procedure tries to re-do the
483        // enable/disable operation, which was already done. Ignore those exceptions before change
484        // of behaviors of AssignmentManager in presence of PV2
485        if (e instanceof TableNotEnabledException) {
486          LOG.warn("Caught TableNotEnabledException in action: " + this.getClass());
487          e.printStackTrace();
488        } else {
489          throw e;
490        }
491      } finally {
492        admin.close();
493      }
494    }
495  }
496
497  private class EnableTableAction extends TableAction {
498
499    @Override
500    void perform() throws IOException {
501
502      TableDescriptor selected = selectTable(disabledTables);
503      if (selected == null ) {
504        return;
505      }
506
507      Admin admin = connection.getAdmin();
508      try {
509        TableName tableName = selected.getTableName();
510        LOG.info("Enabling table :" + selected);
511        admin.enableTable(tableName);
512        Assert.assertTrue("Table: " + selected + " was not enabled",
513            admin.isTableEnabled(tableName));
514        TableDescriptor freshTableDesc = admin.getDescriptor(tableName);
515        Assert.assertTrue(
516          "After enable, Table: " + tableName + " in not enabled", admin.isTableEnabled(tableName));
517        enabledTables.put(tableName, freshTableDesc);
518        LOG.info("Enabled table :" + freshTableDesc);
519      } catch (Exception e){
520        LOG.warn("Caught exception in action: " + this.getClass());
521        // TODO workaround
522        // loose restriction for TableNotDisabledException/TableNotEnabledException thrown in sync
523        // operations 1) when enable/disable starts, the table state is changed to
524        // ENABLING/DISABLING (ZK node in 1.x), which will be further changed to ENABLED/DISABLED
525        // once the operation completes 2) if master failover happens in the middle of the
526        // enable/disable operation, the new master will try to recover the tables in
527        // ENABLING/DISABLING state, as programmed in
528        // AssignmentManager#recoverTableInEnablingState() and
529        // AssignmentManager#recoverTableInDisablingState()
530        // 3) after the new master initialization completes, the procedure tries to re-do the
531        // enable/disable operation, which was already done. Ignore those exceptions before
532        // change of behaviors of AssignmentManager in presence of PV2
533        if (e instanceof TableNotDisabledException) {
534          LOG.warn("Caught TableNotDisabledException in action: " + this.getClass());
535          e.printStackTrace();
536        } else {
537          throw e;
538        }
539      } finally {
540        admin.close();
541      }
542    }
543  }
544
545  private class DeleteTableAction extends TableAction {
546
547    @Override
548    void perform() throws IOException {
549
550      TableDescriptor selected = selectTable(disabledTables);
551      if (selected == null) {
552        return;
553      }
554
555      Admin admin = connection.getAdmin();
556      try {
557        TableName tableName = selected.getTableName();
558        LOG.info("Deleting table :" + selected);
559        admin.deleteTable(tableName);
560        Assert.assertFalse("Table: " + selected + " was not deleted",
561                admin.tableExists(tableName));
562        deletedTables.put(tableName, selected);
563        LOG.info("Deleted table :" + selected);
564      } catch (Exception e) {
565        LOG.warn("Caught exception in action: " + this.getClass());
566        throw e;
567      } finally {
568        admin.close();
569      }
570    }
571  }
572
573
574  private abstract class ColumnAction extends TableAction{
575    // ColumnAction has implemented selectFamily() shared by multiple family Actions
576    protected ColumnFamilyDescriptor selectFamily(TableDescriptor td) {
577      if (td == null) {
578        return null;
579      }
580      ColumnFamilyDescriptor[] families = td.getColumnFamilies();
581      if (families.length == 0){
582        LOG.info("No column families in table: " + td);
583        return null;
584      }
585      ColumnFamilyDescriptor randomCfd = families[RandomUtils.nextInt(0, families.length)];
586      return randomCfd;
587    }
588  }
589
590  private class AddColumnFamilyAction extends ColumnAction {
591
592    @Override
593    void perform() throws IOException {
594      TableDescriptor selected = selectTable(disabledTables);
595      if (selected == null) {
596        return;
597      }
598
599      Admin admin = connection.getAdmin();
600      try {
601        ColumnFamilyDescriptor cfd = createFamilyDesc();
602        if (selected.hasColumnFamily(cfd.getName())){
603          LOG.info(new String(cfd.getName()) + " already exists in table "
604              + selected.getTableName());
605          return;
606        }
607        TableName tableName = selected.getTableName();
608        LOG.info("Adding column family: " + cfd + " to table: " + tableName);
609        admin.addColumnFamily(tableName, cfd);
610        // assertion
611        TableDescriptor freshTableDesc = admin.getDescriptor(tableName);
612        Assert.assertTrue("Column family: " + cfd + " was not added",
613            freshTableDesc.hasColumnFamily(cfd.getName()));
614        Assert.assertTrue(
615          "After add column family, Table: " + tableName + " is not disabled",
616          admin.isTableDisabled(tableName));
617        disabledTables.put(tableName, freshTableDesc);
618        LOG.info("Added column family: " + cfd + " to table: " + tableName);
619      } catch (Exception e) {
620        LOG.warn("Caught exception in action: " + this.getClass());
621        throw e;
622      } finally {
623        admin.close();
624      }
625    }
626
627    private ColumnFamilyDescriptor createFamilyDesc() {
628      String familyName = String.format("cf-%010d", RandomUtils.nextInt());
629      return ColumnFamilyDescriptorBuilder.of(familyName);
630    }
631  }
632
633  private class AlterFamilyVersionsAction extends ColumnAction {
634
635    @Override
636    void perform() throws IOException {
637      TableDescriptor selected = selectTable(disabledTables);
638      if (selected == null) {
639        return;
640      }
641      ColumnFamilyDescriptor columnDesc = selectFamily(selected);
642      if (columnDesc == null){
643        return;
644      }
645
646      Admin admin = connection.getAdmin();
647      int versions = RandomUtils.nextInt(0, 10) + 3;
648      try {
649        TableName tableName = selected.getTableName();
650        LOG.info("Altering versions of column family: " + columnDesc + " to: " + versions +
651            " in table: " + tableName);
652
653        ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(columnDesc)
654            .setMinVersions(versions)
655            .setMaxVersions(versions)
656            .build();
657        TableDescriptor td = TableDescriptorBuilder.newBuilder(selected)
658            .modifyColumnFamily(cfd)
659            .build();
660        admin.modifyTable(td);
661
662        // assertion
663        TableDescriptor freshTableDesc = admin.getDescriptor(tableName);
664        ColumnFamilyDescriptor freshColumnDesc = freshTableDesc.getColumnFamily(columnDesc.getName());
665        Assert.assertEquals("Column family: " + columnDesc + " was not altered",
666            freshColumnDesc.getMaxVersions(), versions);
667        Assert.assertEquals("Column family: " + freshColumnDesc + " was not altered",
668            freshColumnDesc.getMinVersions(), versions);
669        Assert.assertTrue(
670          "After alter versions of column family, Table: " + tableName + " is not disabled",
671          admin.isTableDisabled(tableName));
672        disabledTables.put(tableName, freshTableDesc);
673        LOG.info("Altered versions of column family: " + columnDesc + " to: " + versions +
674          " in table: " + tableName);
675      } catch (Exception e) {
676        LOG.warn("Caught exception in action: " + this.getClass());
677        throw e;
678      } finally {
679        admin.close();
680      }
681    }
682  }
683
684  private class AlterFamilyEncodingAction extends ColumnAction {
685
686    @Override
687    void perform() throws IOException {
688      TableDescriptor selected = selectTable(disabledTables);
689      if (selected == null) {
690        return;
691      }
692      ColumnFamilyDescriptor columnDesc = selectFamily(selected);
693      if (columnDesc == null){
694        return;
695      }
696
697      Admin admin = connection.getAdmin();
698      try {
699        TableName tableName = selected.getTableName();
700        // possible DataBlockEncoding ids
701        DataBlockEncoding[] possibleIds = {DataBlockEncoding.NONE, DataBlockEncoding.PREFIX,
702                DataBlockEncoding.DIFF, DataBlockEncoding.FAST_DIFF, DataBlockEncoding.ROW_INDEX_V1};
703        short id = possibleIds[RandomUtils.nextInt(0, possibleIds.length)].getId();
704        LOG.info("Altering encoding of column family: " + columnDesc + " to: " + id +
705            " in table: " + tableName);
706
707        ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(columnDesc)
708            .setDataBlockEncoding(DataBlockEncoding.getEncodingById(id))
709            .build();
710        TableDescriptor td = TableDescriptorBuilder.newBuilder(selected)
711            .modifyColumnFamily(cfd)
712            .build();
713        admin.modifyTable(td);
714
715        // assertion
716        TableDescriptor freshTableDesc = admin.getTableDescriptor(tableName);
717        ColumnFamilyDescriptor freshColumnDesc = freshTableDesc.getColumnFamily(columnDesc.getName());
718        Assert.assertEquals("Encoding of column family: " + columnDesc + " was not altered",
719            freshColumnDesc.getDataBlockEncoding().getId(), id);
720        Assert.assertTrue(
721          "After alter encoding of column family, Table: " + tableName + " is not disabled",
722          admin.isTableDisabled(tableName));
723        disabledTables.put(tableName, freshTableDesc);
724        LOG.info("Altered encoding of column family: " + freshColumnDesc + " to: " + id +
725          " in table: " + tableName);
726      } catch (Exception e) {
727        LOG.warn("Caught exception in action: " + this.getClass());
728        throw e;
729      } finally {
730        admin.close();
731      }
732    }
733  }
734
735  private class DeleteColumnFamilyAction extends ColumnAction {
736
737    @Override
738    void perform() throws IOException {
739      TableDescriptor selected = selectTable(disabledTables);
740      ColumnFamilyDescriptor cfd = selectFamily(selected);
741      if (selected == null || cfd == null) {
742        return;
743      }
744
745      Admin admin = connection.getAdmin();
746      try {
747        if (selected.getColumnFamilyCount() < 2) {
748          LOG.info("No enough column families to delete in table " + selected.getTableName());
749          return;
750        }
751        TableName tableName = selected.getTableName();
752        LOG.info("Deleting column family: " + cfd + " from table: " + tableName);
753        admin.deleteColumnFamily(tableName, cfd.getName());
754        // assertion
755        TableDescriptor freshTableDesc = admin.getDescriptor(tableName);
756        Assert.assertFalse("Column family: " + cfd + " was not added",
757            freshTableDesc.hasColumnFamily(cfd.getName()));
758        Assert.assertTrue(
759          "After delete column family, Table: " + tableName + " is not disabled",
760          admin.isTableDisabled(tableName));
761        disabledTables.put(tableName, freshTableDesc);
762        LOG.info("Deleted column family: " + cfd + " from table: " + tableName);
763      } catch (Exception e) {
764        LOG.warn("Caught exception in action: " + this.getClass());
765        throw e;
766      } finally {
767        admin.close();
768      }
769    }
770  }
771
772  private class AddRowAction extends ColumnAction {
773    // populate tables
774    @Override
775    void perform() throws IOException {
776      TableDescriptor selected = selectTable(enabledTables);
777      if (selected == null ) {
778        return;
779      }
780
781      Admin admin = connection.getAdmin();
782      TableName tableName = selected.getTableName();
783      try (Table table = connection.getTable(tableName)){
784        ArrayList<HRegionInfo> regionInfos = new ArrayList<>(admin.getTableRegions(
785            selected.getTableName()));
786        int numRegions = regionInfos.size();
787        // average number of rows to be added per action to each region
788        int average_rows = 1;
789        int numRows = average_rows * numRegions;
790        LOG.info("Adding " + numRows + " rows to table: " + selected);
791        for (int i = 0; i < numRows; i++){
792          // nextInt(Integer.MAX_VALUE)) to return positive numbers only
793          byte[] rowKey = Bytes.toBytes(
794              "row-" + String.format("%010d", RandomUtils.nextInt()));
795          ColumnFamilyDescriptor cfd = selectFamily(selected);
796          if (cfd == null){
797            return;
798          }
799          byte[] family = cfd.getName();
800          byte[] qualifier = Bytes.toBytes("col-" + RandomUtils.nextInt() % 10);
801          byte[] value = Bytes.toBytes("val-" + RandomStringUtils.randomAlphanumeric(10));
802          Put put = new Put(rowKey);
803          put.addColumn(family, qualifier, value);
804          table.put(put);
805        }
806        TableDescriptor freshTableDesc = admin.getDescriptor(tableName);
807        Assert.assertTrue(
808          "After insert, Table: " + tableName + " in not enabled", admin.isTableEnabled(tableName));
809        enabledTables.put(tableName, freshTableDesc);
810        LOG.info("Added " + numRows + " rows to table: " + selected);
811      } catch (Exception e) {
812        LOG.warn("Caught exception in action: " + this.getClass());
813        throw e;
814      } finally {
815        admin.close();
816      }
817    }
818  }
819
820  private enum ACTION {
821    CREATE_NAMESPACE,
822    MODIFY_NAMESPACE,
823    DELETE_NAMESPACE,
824    CREATE_TABLE,
825    DISABLE_TABLE,
826    ENABLE_TABLE,
827    DELETE_TABLE,
828    ADD_COLUMNFAMILY,
829    DELETE_COLUMNFAMILY,
830    ALTER_FAMILYVERSIONS,
831    ALTER_FAMILYENCODING,
832    ADD_ROW
833  }
834
835  private class Worker extends Thread {
836
837    private Exception savedException;
838
839    private ACTION action;
840
841    @Override
842    public void run() {
843      while (running.get()) {
844        // select random action
845        ACTION selectedAction = ACTION.values()[RandomUtils.nextInt() % ACTION.values().length];
846        this.action = selectedAction;
847        LOG.info("Performing Action: " + selectedAction);
848
849        try {
850          switch (selectedAction) {
851          case CREATE_NAMESPACE:
852            new CreateNamespaceAction().perform();
853            break;
854          case MODIFY_NAMESPACE:
855            new ModifyNamespaceAction().perform();
856            break;
857          case DELETE_NAMESPACE:
858            new DeleteNamespaceAction().perform();
859            break;
860          case CREATE_TABLE:
861            // stop creating new tables in the later stage of the test to avoid too many empty
862            // tables
863            if (create_table.get()) {
864              new CreateTableAction().perform();
865            }
866            break;
867          case ADD_ROW:
868            new AddRowAction().perform();
869            break;
870          case DISABLE_TABLE:
871            new DisableTableAction().perform();
872            break;
873          case ENABLE_TABLE:
874            new EnableTableAction().perform();
875            break;
876          case DELETE_TABLE:
877            // reduce probability of deleting table to 20%
878            if (RandomUtils.nextInt(0, 100) < 20) {
879              new DeleteTableAction().perform();
880            }
881            break;
882          case ADD_COLUMNFAMILY:
883            new AddColumnFamilyAction().perform();
884            break;
885          case DELETE_COLUMNFAMILY:
886            // reduce probability of deleting column family to 20%
887            if (RandomUtils.nextInt(0, 100) < 20) {
888              new DeleteColumnFamilyAction().perform();
889            }
890            break;
891          case ALTER_FAMILYVERSIONS:
892            new AlterFamilyVersionsAction().perform();
893            break;
894          case ALTER_FAMILYENCODING:
895            new AlterFamilyEncodingAction().perform();
896            break;
897          }
898        } catch (Exception ex) {
899          this.savedException = ex;
900          return;
901        }
902      }
903      LOG.info(this.getName() + " stopped");
904    }
905
906    public Exception getSavedException(){
907      return this.savedException;
908    }
909
910    public ACTION getAction(){
911      return this.action;
912    }
913  }
914
915  private void checkException(List<Worker> workers){
916    if(workers == null || workers.isEmpty())
917      return;
918    for (Worker worker : workers){
919      Exception e = worker.getSavedException();
920      if (e != null) {
921        LOG.error("Found exception in thread: " + worker.getName());
922        e.printStackTrace();
923      }
924      Assert.assertNull("Action failed: " + worker.getAction() + " in thread: "
925          + worker.getName(), e);
926    }
927  }
928
929  private int runTest() throws Exception {
930    LOG.info("Starting the test");
931
932    String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
933    long runtime = util.getConfiguration().getLong(runtimeKey, DEFAULT_RUN_TIME);
934
935    String numThreadKey = String.format(NUM_THREADS_KEY, this.getClass().getSimpleName());
936    numThreads = util.getConfiguration().getInt(numThreadKey, DEFAULT_NUM_THREADS);
937
938    ArrayList<Worker> workers = new ArrayList<>(numThreads);
939    for (int i = 0; i < numThreads; i++) {
940      checkException(workers);
941      Worker worker = new Worker();
942      LOG.info("Launching worker thread " + worker.getName());
943      workers.add(worker);
944      worker.start();
945    }
946
947    Threads.sleep(runtime / 2);
948    LOG.info("Stopping creating new tables");
949    create_table.set(false);
950    Threads.sleep(runtime / 2);
951    LOG.info("Runtime is up");
952    running.set(false);
953
954    checkException(workers);
955
956    for (Worker worker : workers) {
957      worker.join();
958    }
959    LOG.info("All Worker threads stopped");
960
961    // verify
962    LOG.info("Verify actions of all threads succeeded");
963    checkException(workers);
964    LOG.info("Verify namespaces");
965    verifyNamespaces();
966    LOG.info("Verify states of all tables");
967    verifyTables();
968
969    // RUN HBCK
970
971    HBaseFsck hbck = null;
972    try {
973      LOG.info("Running hbck");
974      hbck = HbckTestingUtil.doFsck(util.getConfiguration(), false);
975      if (HbckTestingUtil.inconsistencyFound(hbck)) {
976        // Find the inconsistency during HBCK. Leave table and namespace undropped so that
977        // we can check outside the test.
978        keepObjectsAtTheEnd = true;
979      }
980      HbckTestingUtil.assertNoErrors(hbck);
981      LOG.info("Finished hbck");
982    } finally {
983      if (hbck != null) {
984        hbck.close();
985      }
986    }
987     return 0;
988  }
989
990  @Override
991  public TableName getTablename() {
992    return null; // This test is not inteded to run with stock Chaos Monkey
993  }
994
995  @Override
996  protected Set<String> getColumnFamilies() {
997    return null; // This test is not inteded to run with stock Chaos Monkey
998  }
999
1000  public static void main(String[] args) throws Exception {
1001    Configuration conf = HBaseConfiguration.create();
1002    IntegrationTestingUtility.setUseDistributedCluster(conf);
1003    IntegrationTestDDLMasterFailover masterFailover = new IntegrationTestDDLMasterFailover();
1004    Connection connection = null;
1005    int ret = 1;
1006    try {
1007      // Initialize connection once, then pass to Actions
1008      LOG.debug("Setting up connection ...");
1009      connection = ConnectionFactory.createConnection(conf);
1010      masterFailover.setConnection(connection);
1011      ret = ToolRunner.run(conf, masterFailover, args);
1012    } catch (IOException e){
1013      LOG.error(HBaseMarkers.FATAL, "Failed to establish connection. Aborting test ...", e);
1014    } finally {
1015      connection = masterFailover.getConnection();
1016      if (connection != null){
1017        connection.close();
1018      }
1019      System.exit(ret);
1020    }
1021  }
1022}