001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import static org.junit.jupiter.api.Assertions.assertTrue;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.Set;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.ThreadLocalRandom;
028import java.util.concurrent.atomic.AtomicBoolean;
029import java.util.regex.Pattern;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.client.Admin;
032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
034import org.apache.hadoop.hbase.client.Connection;
035import org.apache.hadoop.hbase.client.ConnectionFactory;
036import org.apache.hadoop.hbase.client.Put;
037import org.apache.hadoop.hbase.client.RegionInfo;
038import org.apache.hadoop.hbase.client.Table;
039import org.apache.hadoop.hbase.client.TableDescriptor;
040import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
041import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
042import org.apache.hadoop.hbase.log.HBaseMarkers;
043import org.apache.hadoop.hbase.testclassification.IntegrationTests;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.apache.hadoop.hbase.util.HBaseFsck;
046import org.apache.hadoop.hbase.util.Threads;
047import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil;
048import org.apache.hadoop.util.ToolRunner;
049import org.junit.Assert;
050import org.junit.Test;
051import org.junit.experimental.categories.Category;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055/**
056 * Integration test that verifies Procedure V2. DDL operations should go through (rollforward or
057 * rollback) when primary master is killed by ChaosMonkey (default MASTER_KILLING).
058 * <p>
059 * </p>
060 * Multiple Worker threads are started to randomly do the following Actions in loops: Actions
061 * generating and populating tables:
062 * <ul>
063 * <li>CreateTableAction</li>
064 * <li>DisableTableAction</li>
065 * <li>EnableTableAction</li>
066 * <li>DeleteTableAction</li>
067 * <li>AddRowAction</li>
068 * </ul>
069 * Actions performing column family DDL operations:
070 * <ul>
071 * <li>AddColumnFamilyAction</li>
072 * <li>AlterColumnFamilyVersionsAction</li>
073 * <li>AlterColumnFamilyEncodingAction</li>
074 * <li>DeleteColumnFamilyAction</li>
075 * </ul>
076 * Actions performing namespace DDL operations:
077 * <ul>
078 * <li>AddNamespaceAction</li>
079 * <li>AlterNamespaceAction</li>
080 * <li>DeleteNamespaceAction</li>
081 * </ul>
082 * <br/>
083 * The threads run for a period of time (default 20 minutes) then are stopped at the end of runtime.
084 * Verification is performed towards those checkpoints:
085 * <ol>
086 * <li>No Actions throw Exceptions.</li>
087 * <li>No inconsistencies are detected in hbck.</li>
088 * </ol>
089 * <p>
090 * This test should be run by the hbase user since it invokes hbck at the end
091 * </p>
092 * <p>
093 * Usage: hbase org.apache.hadoop.hbase.IntegrationTestDDLMasterFailover
094 * -Dhbase.IntegrationTestDDLMasterFailover.runtime=1200000
095 * -Dhbase.IntegrationTestDDLMasterFailover.numThreads=20
096 * -Dhbase.IntegrationTestDDLMasterFailover.numRegions=50 --monkey masterKilling
097 */
098
099@Category(IntegrationTests.class)
100public class IntegrationTestDDLMasterFailover extends IntegrationTestBase {
101
102  private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestDDLMasterFailover.class);
103
104  private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster
105
106  protected static final long DEFAULT_RUN_TIME = 20 * 60 * 1000;
107
108  protected static final int DEFAULT_NUM_THREADS = 20;
109
110  protected static final int DEFAULT_NUM_REGIONS = 50; // number of regions in pre-split tables
111
112  private boolean keepObjectsAtTheEnd = false;
113  protected HBaseClusterInterface cluster;
114
115  protected Connection connection;
116
117  /**
118   * A soft limit on how long we should run
119   */
120  protected static final String RUN_TIME_KEY = "hbase.%s.runtime";
121  protected static final String NUM_THREADS_KEY = "hbase.%s.numThreads";
122  protected static final String NUM_REGIONS_KEY = "hbase.%s.numRegions";
123
124  protected AtomicBoolean running = new AtomicBoolean(true);
125
126  protected AtomicBoolean create_table = new AtomicBoolean(true);
127
128  protected int numThreads, numRegions;
129
130  ConcurrentHashMap<String, NamespaceDescriptor> namespaceMap = new ConcurrentHashMap<>();
131
132  ConcurrentHashMap<TableName, TableDescriptor> enabledTables = new ConcurrentHashMap<>();
133
134  ConcurrentHashMap<TableName, TableDescriptor> disabledTables = new ConcurrentHashMap<>();
135
136  ConcurrentHashMap<TableName, TableDescriptor> deletedTables = new ConcurrentHashMap<>();
137
138  @Override
139  public void setUpCluster() throws Exception {
140    util = getTestingUtil(getConf());
141    LOG.debug("Initializing/checking cluster has " + SERVER_COUNT + " servers");
142    util.initializeCluster(getMinServerCount());
143    LOG.debug("Done initializing/checking cluster");
144    cluster = util.getHBaseClusterInterface();
145  }
146
147  @Override
148  public void cleanUpCluster() throws Exception {
149    if (!keepObjectsAtTheEnd) {
150      Admin admin = util.getAdmin();
151      for (TableName tableName : admin.listTableNames(Pattern.compile("ittable-\\d+"))) {
152        admin.disableTable(tableName);
153        admin.deleteTable(tableName);
154      }
155      NamespaceDescriptor[] nsds = admin.listNamespaceDescriptors();
156      for (NamespaceDescriptor nsd : nsds) {
157        if (nsd.getName().matches("itnamespace\\d+")) {
158          LOG.info("Removing namespace=" + nsd.getName());
159          admin.deleteNamespace(nsd.getName());
160        }
161      }
162    }
163
164    enabledTables.clear();
165    disabledTables.clear();
166    deletedTables.clear();
167    namespaceMap.clear();
168
169    Connection connection = getConnection();
170    connection.close();
171    super.cleanUpCluster();
172  }
173
174  protected int getMinServerCount() {
175    return SERVER_COUNT;
176  }
177
178  protected synchronized void setConnection(Connection connection) {
179    this.connection = connection;
180  }
181
182  protected synchronized Connection getConnection() {
183    if (this.connection == null) {
184      try {
185        Connection connection = ConnectionFactory.createConnection(getConf());
186        setConnection(connection);
187      } catch (IOException e) {
188        LOG.error(HBaseMarkers.FATAL, "Failed to establish connection.", e);
189      }
190    }
191    return connection;
192  }
193
194  protected void verifyNamespaces() throws IOException {
195    Connection connection = getConnection();
196    Admin admin = connection.getAdmin();
197    // iterating concurrent map
198    for (String nsName : namespaceMap.keySet()) {
199      try {
200        assertTrue(admin.getNamespaceDescriptor(nsName) != null,
201          "Namespace: " + nsName + " in namespaceMap does not exist");
202      } catch (NamespaceNotFoundException nsnfe) {
203        Assert
204          .fail("Namespace: " + nsName + " in namespaceMap does not exist: " + nsnfe.getMessage());
205      }
206    }
207    admin.close();
208  }
209
210  protected void verifyTables() throws IOException {
211    Connection connection = getConnection();
212    Admin admin = connection.getAdmin();
213    // iterating concurrent map
214    for (TableName tableName : enabledTables.keySet()) {
215      assertTrue(admin.isTableEnabled(tableName),
216        "Table: " + tableName + " in enabledTables is not enabled");
217    }
218    for (TableName tableName : disabledTables.keySet()) {
219      assertTrue(admin.isTableDisabled(tableName),
220        "Table: " + tableName + " in disabledTables is not disabled");
221    }
222    for (TableName tableName : deletedTables.keySet()) {
223      Assert.assertFalse("Table: " + tableName + " in deletedTables is not deleted",
224        admin.tableExists(tableName));
225    }
226    admin.close();
227  }
228
229  @Test
230  public void testAsUnitTest() throws Exception {
231    runTest();
232  }
233
234  @Override
235  public int runTestFromCommandLine() throws Exception {
236    int ret = runTest();
237    return ret;
238  }
239
240  private abstract class MasterAction {
241    Connection connection = getConnection();
242
243    abstract void perform() throws IOException;
244  }
245
246  private abstract class NamespaceAction extends MasterAction {
247    final String nsTestConfigKey = "hbase.namespace.testKey";
248
249    // NamespaceAction has implemented selectNamespace() shared by multiple namespace Actions
250    protected NamespaceDescriptor
251      selectNamespace(ConcurrentHashMap<String, NamespaceDescriptor> namespaceMap) {
252      // synchronization to prevent removal from multiple threads
253      synchronized (namespaceMap) {
254        // randomly select namespace from namespaceMap
255        if (namespaceMap.isEmpty()) {
256          return null;
257        }
258        ArrayList<String> namespaceList = new ArrayList<>(namespaceMap.keySet());
259        String randomKey =
260          namespaceList.get(ThreadLocalRandom.current().nextInt(namespaceList.size()));
261        NamespaceDescriptor randomNsd = namespaceMap.get(randomKey);
262        // remove from namespaceMap
263        namespaceMap.remove(randomKey);
264        return randomNsd;
265      }
266    }
267  }
268
269  private class CreateNamespaceAction extends NamespaceAction {
270    @Override
271    void perform() throws IOException {
272      Admin admin = connection.getAdmin();
273      try {
274        NamespaceDescriptor nsd;
275        while (true) {
276          nsd = createNamespaceDesc();
277          try {
278            if (admin.getNamespaceDescriptor(nsd.getName()) != null) {
279              // the namespace has already existed.
280              continue;
281            } else {
282              // currently, the code never return null - always throws exception if
283              // namespace is not found - this just a defensive programming to make
284              // sure null situation is handled in case the method changes in the
285              // future.
286              break;
287            }
288          } catch (NamespaceNotFoundException nsnfe) {
289            // This is expected for a random generated NamespaceDescriptor
290            break;
291          }
292        }
293        LOG.info("Creating namespace:" + nsd);
294        admin.createNamespace(nsd);
295        NamespaceDescriptor freshNamespaceDesc = admin.getNamespaceDescriptor(nsd.getName());
296        assertTrue(freshNamespaceDesc != null, "Namespace: " + nsd + " was not created");
297        namespaceMap.put(nsd.getName(), freshNamespaceDesc);
298        LOG.info("Created namespace:" + freshNamespaceDesc);
299      } catch (Exception e) {
300        LOG.warn("Caught exception in action: " + this.getClass());
301        throw e;
302      } finally {
303        admin.close();
304      }
305    }
306
307    private NamespaceDescriptor createNamespaceDesc() {
308      String namespaceName =
309        "itnamespace" + String.format("%010d", ThreadLocalRandom.current().nextInt());
310      NamespaceDescriptor nsd = NamespaceDescriptor.create(namespaceName).build();
311
312      nsd.setConfiguration(nsTestConfigKey,
313        String.format("%010d", ThreadLocalRandom.current().nextInt()));
314      return nsd;
315    }
316  }
317
318  private class ModifyNamespaceAction extends NamespaceAction {
319    @Override
320    void perform() throws IOException {
321      NamespaceDescriptor selected = selectNamespace(namespaceMap);
322      if (selected == null) {
323        return;
324      }
325
326      Admin admin = connection.getAdmin();
327      try {
328        String namespaceName = selected.getName();
329        LOG.info("Modifying namespace :" + selected);
330        NamespaceDescriptor modifiedNsd = NamespaceDescriptor.create(namespaceName).build();
331        String nsValueNew;
332        do {
333          nsValueNew = String.format("%010d", ThreadLocalRandom.current().nextInt());
334        } while (selected.getConfigurationValue(nsTestConfigKey).equals(nsValueNew));
335        modifiedNsd.setConfiguration(nsTestConfigKey, nsValueNew);
336        admin.modifyNamespace(modifiedNsd);
337        NamespaceDescriptor freshNamespaceDesc = admin.getNamespaceDescriptor(namespaceName);
338        assertTrue(freshNamespaceDesc.getConfigurationValue(nsTestConfigKey).equals(nsValueNew),
339          "Namespace: " + selected + " was not modified");
340        assertTrue(admin.getNamespaceDescriptor(namespaceName) != null,
341          "Namespace: " + namespaceName + " does not exist");
342        namespaceMap.put(namespaceName, freshNamespaceDesc);
343        LOG.info("Modified namespace :" + freshNamespaceDesc);
344      } catch (Exception e) {
345        LOG.warn("Caught exception in action: " + this.getClass());
346        throw e;
347      } finally {
348        admin.close();
349      }
350    }
351  }
352
353  private class DeleteNamespaceAction extends NamespaceAction {
354    @Override
355    void perform() throws IOException {
356      NamespaceDescriptor selected = selectNamespace(namespaceMap);
357      if (selected == null) {
358        return;
359      }
360
361      Admin admin = connection.getAdmin();
362      try {
363        String namespaceName = selected.getName();
364        LOG.info("Deleting namespace :" + selected);
365        admin.deleteNamespace(namespaceName);
366        try {
367          if (admin.getNamespaceDescriptor(namespaceName) != null) {
368            // the namespace still exists.
369            assertTrue(false, "Namespace: " + selected + " was not deleted");
370          } else {
371            LOG.info("Deleted namespace :" + selected);
372          }
373        } catch (NamespaceNotFoundException nsnfe) {
374          // This is expected result
375          LOG.info("Deleted namespace :" + selected);
376        }
377      } catch (Exception e) {
378        LOG.warn("Caught exception in action: " + this.getClass());
379        throw e;
380      } finally {
381        admin.close();
382      }
383    }
384  }
385
386  private abstract class TableAction extends MasterAction {
387    // TableAction has implemented selectTable() shared by multiple table Actions
388    protected TableDescriptor selectTable(ConcurrentHashMap<TableName, TableDescriptor> tableMap) {
389      // synchronization to prevent removal from multiple threads
390      synchronized (tableMap) {
391        // randomly select table from tableMap
392        if (tableMap.isEmpty()) {
393          return null;
394        }
395        ArrayList<TableName> tableList = new ArrayList<>(tableMap.keySet());
396        TableName key = tableList.get(ThreadLocalRandom.current().nextInt(tableList.size()));
397        TableDescriptor randomTd = tableMap.remove(key);
398        return randomTd;
399      }
400    }
401  }
402
403  private class CreateTableAction extends TableAction {
404
405    @Override
406    void perform() throws IOException {
407      Admin admin = connection.getAdmin();
408      try {
409        TableDescriptor td = createTableDesc();
410        TableName tableName = td.getTableName();
411        if (admin.tableExists(tableName)) {
412          return;
413        }
414        String numRegionKey = String.format(NUM_REGIONS_KEY, this.getClass().getSimpleName());
415        numRegions = getConf().getInt(numRegionKey, DEFAULT_NUM_REGIONS);
416        byte[] startKey = Bytes.toBytes("row-0000000000");
417        byte[] endKey = Bytes.toBytes("row-" + Integer.MAX_VALUE);
418        LOG.info("Creating table:" + td);
419        admin.createTable(td, startKey, endKey, numRegions);
420        assertTrue(admin.tableExists(tableName), "Table: " + td + " was not created");
421        TableDescriptor freshTableDesc = admin.getDescriptor(tableName);
422        assertTrue(admin.isTableEnabled(tableName),
423          "After create, Table: " + tableName + " in not enabled");
424        enabledTables.put(tableName, freshTableDesc);
425        LOG.info("Created table:" + freshTableDesc);
426      } catch (Exception e) {
427        LOG.warn("Caught exception in action: " + this.getClass());
428        throw e;
429      } finally {
430        admin.close();
431      }
432    }
433
434    private TableDescriptor createTableDesc() {
435      String tableName =
436        String.format("ittable-%010d", ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE));
437      String familyName = "cf-" + ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE);
438      return TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))
439        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(familyName)).build();
440    }
441  }
442
443  private class DisableTableAction extends TableAction {
444
445    @Override
446    void perform() throws IOException {
447
448      TableDescriptor selected = selectTable(enabledTables);
449      if (selected == null) {
450        return;
451      }
452
453      Admin admin = connection.getAdmin();
454      try {
455        TableName tableName = selected.getTableName();
456        LOG.info("Disabling table :" + selected);
457        admin.disableTable(tableName);
458        assertTrue(admin.isTableDisabled(tableName), "Table: " + selected + " was not disabled");
459        TableDescriptor freshTableDesc = admin.getDescriptor(tableName);
460        assertTrue(admin.isTableDisabled(tableName),
461          "After disable, Table: " + tableName + " is not disabled");
462        disabledTables.put(tableName, freshTableDesc);
463        LOG.info("Disabled table :" + freshTableDesc);
464      } catch (Exception e) {
465        LOG.warn("Caught exception in action: " + this.getClass());
466        // TODO workaround
467        // loose restriction for TableNotDisabledException/TableNotEnabledException thrown in sync
468        // operations
469        // 1) when enable/disable starts, the table state is changed to ENABLING/DISABLING (ZK node
470        // in 1.x), which will be further changed to ENABLED/DISABLED once the operation completes
471        // 2) if master failover happens in the middle of the enable/disable operation, the new
472        // master will try to recover the tables in ENABLING/DISABLING state, as programmed in
473        // AssignmentManager#recoverTableInEnablingState() and
474        // AssignmentManager#recoverTableInDisablingState()
475        // 3) after the new master initialization completes, the procedure tries to re-do the
476        // enable/disable operation, which was already done. Ignore those exceptions before change
477        // of behaviors of AssignmentManager in presence of PV2
478        if (e instanceof TableNotEnabledException) {
479          LOG.warn("Caught TableNotEnabledException in action: " + this.getClass());
480          e.printStackTrace();
481        } else {
482          throw e;
483        }
484      } finally {
485        admin.close();
486      }
487    }
488  }
489
490  private class EnableTableAction extends TableAction {
491
492    @Override
493    void perform() throws IOException {
494
495      TableDescriptor selected = selectTable(disabledTables);
496      if (selected == null) {
497        return;
498      }
499
500      Admin admin = connection.getAdmin();
501      try {
502        TableName tableName = selected.getTableName();
503        LOG.info("Enabling table :" + selected);
504        admin.enableTable(tableName);
505        assertTrue(admin.isTableEnabled(tableName), "Table: " + selected + " was not enabled");
506        TableDescriptor freshTableDesc = admin.getDescriptor(tableName);
507        assertTrue(admin.isTableEnabled(tableName),
508          "After enable, Table: " + tableName + " in not enabled");
509        enabledTables.put(tableName, freshTableDesc);
510        LOG.info("Enabled table :" + freshTableDesc);
511      } catch (Exception e) {
512        LOG.warn("Caught exception in action: " + this.getClass());
513        // TODO workaround
514        // loose restriction for TableNotDisabledException/TableNotEnabledException thrown in sync
515        // operations 1) when enable/disable starts, the table state is changed to
516        // ENABLING/DISABLING (ZK node in 1.x), which will be further changed to ENABLED/DISABLED
517        // once the operation completes 2) if master failover happens in the middle of the
518        // enable/disable operation, the new master will try to recover the tables in
519        // ENABLING/DISABLING state, as programmed in
520        // AssignmentManager#recoverTableInEnablingState() and
521        // AssignmentManager#recoverTableInDisablingState()
522        // 3) after the new master initialization completes, the procedure tries to re-do the
523        // enable/disable operation, which was already done. Ignore those exceptions before
524        // change of behaviors of AssignmentManager in presence of PV2
525        if (e instanceof TableNotDisabledException) {
526          LOG.warn("Caught TableNotDisabledException in action: " + this.getClass());
527          e.printStackTrace();
528        } else {
529          throw e;
530        }
531      } finally {
532        admin.close();
533      }
534    }
535  }
536
537  private class DeleteTableAction extends TableAction {
538
539    @Override
540    void perform() throws IOException {
541
542      TableDescriptor selected = selectTable(disabledTables);
543      if (selected == null) {
544        return;
545      }
546
547      Admin admin = connection.getAdmin();
548      try {
549        TableName tableName = selected.getTableName();
550        LOG.info("Deleting table :" + selected);
551        admin.deleteTable(tableName);
552        Assert.assertFalse("Table: " + selected + " was not deleted", admin.tableExists(tableName));
553        deletedTables.put(tableName, selected);
554        LOG.info("Deleted table :" + selected);
555      } catch (Exception e) {
556        LOG.warn("Caught exception in action: " + this.getClass());
557        throw e;
558      } finally {
559        admin.close();
560      }
561    }
562  }
563
564  private abstract class ColumnAction extends TableAction {
565    // ColumnAction has implemented selectFamily() shared by multiple family Actions
566    protected ColumnFamilyDescriptor selectFamily(TableDescriptor td) {
567      if (td == null) {
568        return null;
569      }
570      ColumnFamilyDescriptor[] families = td.getColumnFamilies();
571      if (families.length == 0) {
572        LOG.info("No column families in table: " + td);
573        return null;
574      }
575      return families[ThreadLocalRandom.current().nextInt(families.length)];
576    }
577  }
578
579  private class AddColumnFamilyAction extends ColumnAction {
580
581    @Override
582    void perform() throws IOException {
583      TableDescriptor selected = selectTable(disabledTables);
584      if (selected == null) {
585        return;
586      }
587
588      Admin admin = connection.getAdmin();
589      try {
590        ColumnFamilyDescriptor cfd = createFamilyDesc();
591        if (selected.hasColumnFamily(cfd.getName())) {
592          LOG.info(
593            Bytes.toString(cfd.getName()) + " already exists in table " + selected.getTableName());
594          return;
595        }
596        TableName tableName = selected.getTableName();
597        LOG.info("Adding column family: " + cfd + " to table: " + tableName);
598        admin.addColumnFamily(tableName, cfd);
599        // assertion
600        TableDescriptor freshTableDesc = admin.getDescriptor(tableName);
601        assertTrue(freshTableDesc.hasColumnFamily(cfd.getName()),
602          "Column family: " + cfd + " was not added");
603        assertTrue(admin.isTableDisabled(tableName),
604          "After add column family, Table: " + tableName + " is not disabled");
605        disabledTables.put(tableName, freshTableDesc);
606        LOG.info("Added column family: " + cfd + " to table: " + tableName);
607      } catch (Exception e) {
608        LOG.warn("Caught exception in action: " + this.getClass());
609        throw e;
610      } finally {
611        admin.close();
612      }
613    }
614
615    private ColumnFamilyDescriptor createFamilyDesc() {
616      String familyName = String.format("cf-%010d", ThreadLocalRandom.current().nextInt());
617      return ColumnFamilyDescriptorBuilder.of(familyName);
618    }
619  }
620
621  private class AlterFamilyVersionsAction extends ColumnAction {
622
623    @Override
624    void perform() throws IOException {
625      TableDescriptor selected = selectTable(disabledTables);
626      if (selected == null) {
627        return;
628      }
629      ColumnFamilyDescriptor columnDesc = selectFamily(selected);
630      if (columnDesc == null) {
631        return;
632      }
633
634      Admin admin = connection.getAdmin();
635      int versions = ThreadLocalRandom.current().nextInt(10) + 3;
636      try {
637        TableName tableName = selected.getTableName();
638        LOG.info("Altering versions of column family: " + columnDesc + " to: " + versions
639          + " in table: " + tableName);
640
641        ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(columnDesc)
642          .setMinVersions(versions).setMaxVersions(versions).build();
643        TableDescriptor td =
644          TableDescriptorBuilder.newBuilder(selected).modifyColumnFamily(cfd).build();
645        admin.modifyTable(td);
646
647        // assertion
648        TableDescriptor freshTableDesc = admin.getDescriptor(tableName);
649        ColumnFamilyDescriptor freshColumnDesc =
650          freshTableDesc.getColumnFamily(columnDesc.getName());
651        Assert.assertEquals("Column family: " + columnDesc + " was not altered",
652          freshColumnDesc.getMaxVersions(), versions);
653        Assert.assertEquals("Column family: " + freshColumnDesc + " was not altered",
654          freshColumnDesc.getMinVersions(), versions);
655        assertTrue(admin.isTableDisabled(tableName),
656          "After alter versions of column family, Table: " + tableName + " is not disabled");
657        disabledTables.put(tableName, freshTableDesc);
658        LOG.info("Altered versions of column family: " + columnDesc + " to: " + versions
659          + " in table: " + tableName);
660      } catch (Exception e) {
661        LOG.warn("Caught exception in action: " + this.getClass());
662        throw e;
663      } finally {
664        admin.close();
665      }
666    }
667  }
668
669  private class AlterFamilyEncodingAction extends ColumnAction {
670
671    @Override
672    void perform() throws IOException {
673      TableDescriptor selected = selectTable(disabledTables);
674      if (selected == null) {
675        return;
676      }
677      ColumnFamilyDescriptor columnDesc = selectFamily(selected);
678      if (columnDesc == null) {
679        return;
680      }
681
682      Admin admin = connection.getAdmin();
683      try {
684        TableName tableName = selected.getTableName();
685        // possible DataBlockEncoding ids
686        DataBlockEncoding[] possibleIds = { DataBlockEncoding.NONE, DataBlockEncoding.PREFIX,
687          DataBlockEncoding.DIFF, DataBlockEncoding.FAST_DIFF, DataBlockEncoding.ROW_INDEX_V1 };
688        short id = possibleIds[ThreadLocalRandom.current().nextInt(possibleIds.length)].getId();
689        LOG.info("Altering encoding of column family: " + columnDesc + " to: " + id + " in table: "
690          + tableName);
691
692        ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(columnDesc)
693          .setDataBlockEncoding(DataBlockEncoding.getEncodingById(id)).build();
694        TableDescriptor td =
695          TableDescriptorBuilder.newBuilder(selected).modifyColumnFamily(cfd).build();
696        admin.modifyTable(td);
697
698        // assertion
699        TableDescriptor freshTableDesc = admin.getDescriptor(tableName);
700        ColumnFamilyDescriptor freshColumnDesc =
701          freshTableDesc.getColumnFamily(columnDesc.getName());
702        Assert.assertEquals("Encoding of column family: " + columnDesc + " was not altered",
703          freshColumnDesc.getDataBlockEncoding().getId(), id);
704        Assert.assertTrue(
705          "After alter encoding of column family, Table: " + tableName + " is not disabled",
706          admin.isTableDisabled(tableName));
707        disabledTables.put(tableName, freshTableDesc);
708        LOG.info("Altered encoding of column family: " + freshColumnDesc + " to: " + id
709          + " in table: " + tableName);
710      } catch (Exception e) {
711        LOG.warn("Caught exception in action: " + this.getClass());
712        throw e;
713      } finally {
714        admin.close();
715      }
716    }
717  }
718
719  private class DeleteColumnFamilyAction extends ColumnAction {
720
721    @Override
722    void perform() throws IOException {
723      TableDescriptor selected = selectTable(disabledTables);
724      ColumnFamilyDescriptor cfd = selectFamily(selected);
725      if (selected == null || cfd == null) {
726        return;
727      }
728
729      Admin admin = connection.getAdmin();
730      try {
731        if (selected.getColumnFamilyCount() < 2) {
732          LOG.info("No enough column families to delete in table " + selected.getTableName());
733          return;
734        }
735        TableName tableName = selected.getTableName();
736        LOG.info("Deleting column family: " + cfd + " from table: " + tableName);
737        admin.deleteColumnFamily(tableName, cfd.getName());
738        // assertion
739        TableDescriptor freshTableDesc = admin.getDescriptor(tableName);
740        Assert.assertFalse("Column family: " + cfd + " was not added",
741          freshTableDesc.hasColumnFamily(cfd.getName()));
742        Assert.assertTrue("After delete column family, Table: " + tableName + " is not disabled",
743          admin.isTableDisabled(tableName));
744        disabledTables.put(tableName, freshTableDesc);
745        LOG.info("Deleted column family: " + cfd + " from table: " + tableName);
746      } catch (Exception e) {
747        LOG.warn("Caught exception in action: " + this.getClass());
748        throw e;
749      } finally {
750        admin.close();
751      }
752    }
753  }
754
755  private class AddRowAction extends ColumnAction {
756    // populate tables
757    @Override
758    void perform() throws IOException {
759      TableDescriptor selected = selectTable(enabledTables);
760      if (selected == null) {
761        return;
762      }
763
764      Admin admin = connection.getAdmin();
765      TableName tableName = selected.getTableName();
766      try (Table table = connection.getTable(tableName)) {
767        ArrayList<RegionInfo> regionInfos =
768          new ArrayList<>(admin.getRegions(selected.getTableName()));
769        int numRegions = regionInfos.size();
770        // average number of rows to be added per action to each region
771        int average_rows = 1;
772        int numRows = average_rows * numRegions;
773        LOG.info("Adding " + numRows + " rows to table: " + selected);
774        byte[] value = new byte[10];
775        for (int i = 0; i < numRows; i++) {
776          // nextInt(Integer.MAX_VALUE)) to return positive numbers only
777          byte[] rowKey =
778            Bytes.toBytes("row-" + String.format("%010d", ThreadLocalRandom.current().nextInt()));
779          ColumnFamilyDescriptor cfd = selectFamily(selected);
780          if (cfd == null) {
781            return;
782          }
783          byte[] family = cfd.getName();
784          byte[] qualifier = Bytes.toBytes("col-" + ThreadLocalRandom.current().nextInt(10));
785          Bytes.random(value);
786          Put put = new Put(rowKey);
787          put.addColumn(family, qualifier, value);
788          table.put(put);
789        }
790        TableDescriptor freshTableDesc = admin.getDescriptor(tableName);
791        Assert.assertTrue("After insert, Table: " + tableName + " in not enabled",
792          admin.isTableEnabled(tableName));
793        enabledTables.put(tableName, freshTableDesc);
794        LOG.info("Added " + numRows + " rows to table: " + selected);
795      } catch (Exception e) {
796        LOG.warn("Caught exception in action: " + this.getClass());
797        throw e;
798      } finally {
799        admin.close();
800      }
801    }
802  }
803
804  private enum ACTION {
805    CREATE_NAMESPACE,
806    MODIFY_NAMESPACE,
807    DELETE_NAMESPACE,
808    CREATE_TABLE,
809    DISABLE_TABLE,
810    ENABLE_TABLE,
811    DELETE_TABLE,
812    ADD_COLUMNFAMILY,
813    DELETE_COLUMNFAMILY,
814    ALTER_FAMILYVERSIONS,
815    ALTER_FAMILYENCODING,
816    ADD_ROW
817  }
818
819  private class Worker extends Thread {
820
821    private Exception savedException;
822
823    private ACTION action;
824
825    @Override
826    public void run() {
827      while (running.get()) {
828        // select random action
829        ACTION selectedAction =
830          ACTION.values()[ThreadLocalRandom.current().nextInt(ACTION.values().length)];
831        this.action = selectedAction;
832        LOG.info("Performing Action: " + selectedAction);
833
834        try {
835          switch (selectedAction) {
836            case CREATE_NAMESPACE:
837              new CreateNamespaceAction().perform();
838              break;
839            case MODIFY_NAMESPACE:
840              new ModifyNamespaceAction().perform();
841              break;
842            case DELETE_NAMESPACE:
843              new DeleteNamespaceAction().perform();
844              break;
845            case CREATE_TABLE:
846              // stop creating new tables in the later stage of the test to avoid too many empty
847              // tables
848              if (create_table.get()) {
849                new CreateTableAction().perform();
850              }
851              break;
852            case ADD_ROW:
853              new AddRowAction().perform();
854              break;
855            case DISABLE_TABLE:
856              new DisableTableAction().perform();
857              break;
858            case ENABLE_TABLE:
859              new EnableTableAction().perform();
860              break;
861            case DELETE_TABLE:
862              // reduce probability of deleting table to 20%
863              if (ThreadLocalRandom.current().nextInt(100) < 20) {
864                new DeleteTableAction().perform();
865              }
866              break;
867            case ADD_COLUMNFAMILY:
868              new AddColumnFamilyAction().perform();
869              break;
870            case DELETE_COLUMNFAMILY:
871              // reduce probability of deleting column family to 20%
872              if (ThreadLocalRandom.current().nextInt(100) < 20) {
873                new DeleteColumnFamilyAction().perform();
874              }
875              break;
876            case ALTER_FAMILYVERSIONS:
877              new AlterFamilyVersionsAction().perform();
878              break;
879            case ALTER_FAMILYENCODING:
880              new AlterFamilyEncodingAction().perform();
881              break;
882          }
883        } catch (Exception ex) {
884          this.savedException = ex;
885          return;
886        }
887      }
888      LOG.info(this.getName() + " stopped");
889    }
890
891    public Exception getSavedException() {
892      return this.savedException;
893    }
894
895    public ACTION getAction() {
896      return this.action;
897    }
898  }
899
900  private void checkException(List<Worker> workers) {
901    if (workers == null || workers.isEmpty()) return;
902    for (Worker worker : workers) {
903      Exception e = worker.getSavedException();
904      if (e != null) {
905        LOG.error("Found exception in thread: " + worker.getName());
906        e.printStackTrace();
907      }
908      Assert.assertNull("Action failed: " + worker.getAction() + " in thread: " + worker.getName(),
909        e);
910    }
911  }
912
913  private int runTest() throws Exception {
914    LOG.info("Starting the test");
915
916    String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
917    long runtime = util.getConfiguration().getLong(runtimeKey, DEFAULT_RUN_TIME);
918
919    String numThreadKey = String.format(NUM_THREADS_KEY, this.getClass().getSimpleName());
920    numThreads = util.getConfiguration().getInt(numThreadKey, DEFAULT_NUM_THREADS);
921
922    ArrayList<Worker> workers = new ArrayList<>(numThreads);
923    for (int i = 0; i < numThreads; i++) {
924      checkException(workers);
925      Worker worker = new Worker();
926      LOG.info("Launching worker thread " + worker.getName());
927      workers.add(worker);
928      worker.start();
929    }
930
931    Threads.sleep(runtime / 2);
932    LOG.info("Stopping creating new tables");
933    create_table.set(false);
934    Threads.sleep(runtime / 2);
935    LOG.info("Runtime is up");
936    running.set(false);
937
938    checkException(workers);
939
940    for (Worker worker : workers) {
941      worker.join();
942    }
943    LOG.info("All Worker threads stopped");
944
945    // verify
946    LOG.info("Verify actions of all threads succeeded");
947    checkException(workers);
948    LOG.info("Verify namespaces");
949    verifyNamespaces();
950    LOG.info("Verify states of all tables");
951    verifyTables();
952
953    // RUN HBCK
954
955    HBaseFsck hbck = null;
956    try {
957      LOG.info("Running hbck");
958      hbck = HbckTestingUtil.doFsck(util.getConfiguration(), false);
959      if (HbckTestingUtil.inconsistencyFound(hbck)) {
960        // Find the inconsistency during HBCK. Leave table and namespace undropped so that
961        // we can check outside the test.
962        keepObjectsAtTheEnd = true;
963      }
964      HbckTestingUtil.assertNoErrors(hbck);
965      LOG.info("Finished hbck");
966    } finally {
967      if (hbck != null) {
968        hbck.close();
969      }
970    }
971    return 0;
972  }
973
974  @Override
975  public TableName getTablename() {
976    return null; // This test is not inteded to run with stock Chaos Monkey
977  }
978
979  @Override
980  protected Set<String> getColumnFamilies() {
981    return null; // This test is not inteded to run with stock Chaos Monkey
982  }
983
984  public static void main(String[] args) throws Exception {
985    Configuration conf = HBaseConfiguration.create();
986    IntegrationTestingUtility.setUseDistributedCluster(conf);
987    IntegrationTestDDLMasterFailover masterFailover = new IntegrationTestDDLMasterFailover();
988    Connection connection = null;
989    int ret = 1;
990    try {
991      // Initialize connection once, then pass to Actions
992      LOG.debug("Setting up connection ...");
993      connection = ConnectionFactory.createConnection(conf);
994      masterFailover.setConnection(connection);
995      ret = ToolRunner.run(conf, masterFailover, args);
996    } catch (IOException e) {
997      LOG.error(HBaseMarkers.FATAL, "Failed to establish connection. Aborting test ...", e);
998    } finally {
999      connection = masterFailover.getConnection();
1000      if (connection != null) {
1001        connection.close();
1002      }
1003      System.exit(ret);
1004    }
1005  }
1006}