001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertNull;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024import static org.junit.jupiter.api.Assertions.fail;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.Set;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.ThreadLocalRandom;
032import java.util.concurrent.atomic.AtomicBoolean;
033import java.util.regex.Pattern;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.client.Admin;
036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
038import org.apache.hadoop.hbase.client.Connection;
039import org.apache.hadoop.hbase.client.ConnectionFactory;
040import org.apache.hadoop.hbase.client.Put;
041import org.apache.hadoop.hbase.client.RegionInfo;
042import org.apache.hadoop.hbase.client.Table;
043import org.apache.hadoop.hbase.client.TableDescriptor;
044import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
045import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
046import org.apache.hadoop.hbase.log.HBaseMarkers;
047import org.apache.hadoop.hbase.testclassification.IntegrationTests;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.apache.hadoop.hbase.util.HBaseFsck;
050import org.apache.hadoop.hbase.util.Threads;
051import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil;
052import org.apache.hadoop.util.ToolRunner;
053import org.junit.jupiter.api.Tag;
054import org.junit.jupiter.api.Test;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058/**
059 * Integration test that verifies Procedure V2. DDL operations should go through (rollforward or
060 * rollback) when primary master is killed by ChaosMonkey (default MASTER_KILLING).
061 * <p>
062 * </p>
063 * Multiple Worker threads are started to randomly do the following Actions in loops: Actions
064 * generating and populating tables:
065 * <ul>
066 * <li>CreateTableAction</li>
067 * <li>DisableTableAction</li>
068 * <li>EnableTableAction</li>
069 * <li>DeleteTableAction</li>
070 * <li>AddRowAction</li>
071 * </ul>
072 * Actions performing column family DDL operations:
073 * <ul>
074 * <li>AddColumnFamilyAction</li>
075 * <li>AlterColumnFamilyVersionsAction</li>
076 * <li>AlterColumnFamilyEncodingAction</li>
077 * <li>DeleteColumnFamilyAction</li>
078 * </ul>
079 * Actions performing namespace DDL operations:
080 * <ul>
081 * <li>AddNamespaceAction</li>
082 * <li>AlterNamespaceAction</li>
083 * <li>DeleteNamespaceAction</li>
084 * </ul>
085 * <br/>
086 * The threads run for a period of time (default 20 minutes) then are stopped at the end of runtime.
087 * Verification is performed towards those checkpoints:
088 * <ol>
089 * <li>No Actions throw Exceptions.</li>
090 * <li>No inconsistencies are detected in hbck.</li>
091 * </ol>
092 * <p>
093 * This test should be run by the hbase user since it invokes hbck at the end
094 * </p>
095 * <p>
096 * Usage: hbase org.apache.hadoop.hbase.IntegrationTestDDLMasterFailover
097 * -Dhbase.IntegrationTestDDLMasterFailover.runtime=1200000
098 * -Dhbase.IntegrationTestDDLMasterFailover.numThreads=20
099 * -Dhbase.IntegrationTestDDLMasterFailover.numRegions=50 --monkey masterKilling
100 */
101
102@Tag(IntegrationTests.TAG)
103public class IntegrationTestDDLMasterFailover extends IntegrationTestBase {
104
105  private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestDDLMasterFailover.class);
106
107  private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster
108
109  protected static final long DEFAULT_RUN_TIME = 20 * 60 * 1000;
110
111  protected static final int DEFAULT_NUM_THREADS = 20;
112
113  protected static final int DEFAULT_NUM_REGIONS = 50; // number of regions in pre-split tables
114
115  private boolean keepObjectsAtTheEnd = false;
116  protected HBaseClusterInterface cluster;
117
118  protected Connection connection;
119
120  /**
121   * A soft limit on how long we should run
122   */
123  protected static final String RUN_TIME_KEY = "hbase.%s.runtime";
124  protected static final String NUM_THREADS_KEY = "hbase.%s.numThreads";
125  protected static final String NUM_REGIONS_KEY = "hbase.%s.numRegions";
126
127  protected AtomicBoolean running = new AtomicBoolean(true);
128
129  protected AtomicBoolean create_table = new AtomicBoolean(true);
130
131  protected int numThreads, numRegions;
132
133  ConcurrentHashMap<String, NamespaceDescriptor> namespaceMap = new ConcurrentHashMap<>();
134
135  ConcurrentHashMap<TableName, TableDescriptor> enabledTables = new ConcurrentHashMap<>();
136
137  ConcurrentHashMap<TableName, TableDescriptor> disabledTables = new ConcurrentHashMap<>();
138
139  ConcurrentHashMap<TableName, TableDescriptor> deletedTables = new ConcurrentHashMap<>();
140
141  @Override
142  public void setUpCluster() throws Exception {
143    util = getTestingUtil(getConf());
144    LOG.debug("Initializing/checking cluster has " + SERVER_COUNT + " servers");
145    util.initializeCluster(getMinServerCount());
146    LOG.debug("Done initializing/checking cluster");
147    cluster = util.getHBaseClusterInterface();
148  }
149
150  @Override
151  public void cleanUpCluster() throws Exception {
152    if (!keepObjectsAtTheEnd) {
153      Admin admin = util.getAdmin();
154      for (TableName tableName : admin.listTableNames(Pattern.compile("ittable-\\d+"))) {
155        admin.disableTable(tableName);
156        admin.deleteTable(tableName);
157      }
158      NamespaceDescriptor[] nsds = admin.listNamespaceDescriptors();
159      for (NamespaceDescriptor nsd : nsds) {
160        if (nsd.getName().matches("itnamespace\\d+")) {
161          LOG.info("Removing namespace=" + nsd.getName());
162          admin.deleteNamespace(nsd.getName());
163        }
164      }
165    }
166
167    enabledTables.clear();
168    disabledTables.clear();
169    deletedTables.clear();
170    namespaceMap.clear();
171
172    Connection connection = getConnection();
173    connection.close();
174    super.cleanUpCluster();
175  }
176
177  protected int getMinServerCount() {
178    return SERVER_COUNT;
179  }
180
181  protected synchronized void setConnection(Connection connection) {
182    this.connection = connection;
183  }
184
185  protected synchronized Connection getConnection() {
186    if (this.connection == null) {
187      try {
188        Connection connection = ConnectionFactory.createConnection(getConf());
189        setConnection(connection);
190      } catch (IOException e) {
191        LOG.error(HBaseMarkers.FATAL, "Failed to establish connection.", e);
192      }
193    }
194    return connection;
195  }
196
197  protected void verifyNamespaces() throws IOException {
198    Connection connection = getConnection();
199    Admin admin = connection.getAdmin();
200    // iterating concurrent map
201    for (String nsName : namespaceMap.keySet()) {
202      try {
203        assertTrue(admin.getNamespaceDescriptor(nsName) != null,
204          "Namespace: " + nsName + " in namespaceMap does not exist");
205      } catch (NamespaceNotFoundException nsnfe) {
206        fail("Namespace: " + nsName + " in namespaceMap does not exist: " + nsnfe.getMessage());
207      }
208    }
209    admin.close();
210  }
211
212  protected void verifyTables() throws IOException {
213    Connection connection = getConnection();
214    Admin admin = connection.getAdmin();
215    // iterating concurrent map
216    for (TableName tableName : enabledTables.keySet()) {
217      assertTrue(admin.isTableEnabled(tableName),
218        "Table: " + tableName + " in enabledTables is not enabled");
219    }
220    for (TableName tableName : disabledTables.keySet()) {
221      assertTrue(admin.isTableDisabled(tableName),
222        "Table: " + tableName + " in disabledTables is not disabled");
223    }
224    for (TableName tableName : deletedTables.keySet()) {
225      assertFalse(admin.tableExists(tableName),
226        "Table: " + tableName + " in deletedTables is not deleted");
227    }
228    admin.close();
229  }
230
231  @Test
232  public void testAsUnitTest() throws Exception {
233    runTest();
234  }
235
236  @Override
237  public int runTestFromCommandLine() throws Exception {
238    int ret = runTest();
239    return ret;
240  }
241
242  private abstract class MasterAction {
243    Connection connection = getConnection();
244
245    abstract void perform() throws IOException;
246  }
247
248  private abstract class NamespaceAction extends MasterAction {
249    final String nsTestConfigKey = "hbase.namespace.testKey";
250
251    // NamespaceAction has implemented selectNamespace() shared by multiple namespace Actions
252    protected NamespaceDescriptor
253      selectNamespace(ConcurrentHashMap<String, NamespaceDescriptor> namespaceMap) {
254      // synchronization to prevent removal from multiple threads
255      synchronized (namespaceMap) {
256        // randomly select namespace from namespaceMap
257        if (namespaceMap.isEmpty()) {
258          return null;
259        }
260        ArrayList<String> namespaceList = new ArrayList<>(namespaceMap.keySet());
261        String randomKey =
262          namespaceList.get(ThreadLocalRandom.current().nextInt(namespaceList.size()));
263        NamespaceDescriptor randomNsd = namespaceMap.get(randomKey);
264        // remove from namespaceMap
265        namespaceMap.remove(randomKey);
266        return randomNsd;
267      }
268    }
269  }
270
271  private class CreateNamespaceAction extends NamespaceAction {
272    @Override
273    void perform() throws IOException {
274      Admin admin = connection.getAdmin();
275      try {
276        NamespaceDescriptor nsd;
277        while (true) {
278          nsd = createNamespaceDesc();
279          try {
280            if (admin.getNamespaceDescriptor(nsd.getName()) != null) {
281              // the namespace has already existed.
282              continue;
283            } else {
284              // currently, the code never return null - always throws exception if
285              // namespace is not found - this just a defensive programming to make
286              // sure null situation is handled in case the method changes in the
287              // future.
288              break;
289            }
290          } catch (NamespaceNotFoundException nsnfe) {
291            // This is expected for a random generated NamespaceDescriptor
292            break;
293          }
294        }
295        LOG.info("Creating namespace:" + nsd);
296        admin.createNamespace(nsd);
297        NamespaceDescriptor freshNamespaceDesc = admin.getNamespaceDescriptor(nsd.getName());
298        assertTrue(freshNamespaceDesc != null, "Namespace: " + nsd + " was not created");
299        namespaceMap.put(nsd.getName(), freshNamespaceDesc);
300        LOG.info("Created namespace:" + freshNamespaceDesc);
301      } catch (Exception e) {
302        LOG.warn("Caught exception in action: " + this.getClass());
303        throw e;
304      } finally {
305        admin.close();
306      }
307    }
308
309    private NamespaceDescriptor createNamespaceDesc() {
310      String namespaceName =
311        "itnamespace" + String.format("%010d", ThreadLocalRandom.current().nextInt());
312      NamespaceDescriptor nsd = NamespaceDescriptor.create(namespaceName).build();
313
314      nsd.setConfiguration(nsTestConfigKey,
315        String.format("%010d", ThreadLocalRandom.current().nextInt()));
316      return nsd;
317    }
318  }
319
320  private class ModifyNamespaceAction extends NamespaceAction {
321    @Override
322    void perform() throws IOException {
323      NamespaceDescriptor selected = selectNamespace(namespaceMap);
324      if (selected == null) {
325        return;
326      }
327
328      Admin admin = connection.getAdmin();
329      try {
330        String namespaceName = selected.getName();
331        LOG.info("Modifying namespace :" + selected);
332        NamespaceDescriptor modifiedNsd = NamespaceDescriptor.create(namespaceName).build();
333        String nsValueNew;
334        do {
335          nsValueNew = String.format("%010d", ThreadLocalRandom.current().nextInt());
336        } while (selected.getConfigurationValue(nsTestConfigKey).equals(nsValueNew));
337        modifiedNsd.setConfiguration(nsTestConfigKey, nsValueNew);
338        admin.modifyNamespace(modifiedNsd);
339        NamespaceDescriptor freshNamespaceDesc = admin.getNamespaceDescriptor(namespaceName);
340        assertTrue(freshNamespaceDesc.getConfigurationValue(nsTestConfigKey).equals(nsValueNew),
341          "Namespace: " + selected + " was not modified");
342        assertTrue(admin.getNamespaceDescriptor(namespaceName) != null,
343          "Namespace: " + namespaceName + " does not exist");
344        namespaceMap.put(namespaceName, freshNamespaceDesc);
345        LOG.info("Modified namespace :" + freshNamespaceDesc);
346      } catch (Exception e) {
347        LOG.warn("Caught exception in action: " + this.getClass());
348        throw e;
349      } finally {
350        admin.close();
351      }
352    }
353  }
354
355  private class DeleteNamespaceAction extends NamespaceAction {
356    @Override
357    void perform() throws IOException {
358      NamespaceDescriptor selected = selectNamespace(namespaceMap);
359      if (selected == null) {
360        return;
361      }
362
363      Admin admin = connection.getAdmin();
364      try {
365        String namespaceName = selected.getName();
366        LOG.info("Deleting namespace :" + selected);
367        admin.deleteNamespace(namespaceName);
368        try {
369          if (admin.getNamespaceDescriptor(namespaceName) != null) {
370            // the namespace still exists.
371            assertTrue(false, "Namespace: " + selected + " was not deleted");
372          } else {
373            LOG.info("Deleted namespace :" + selected);
374          }
375        } catch (NamespaceNotFoundException nsnfe) {
376          // This is expected result
377          LOG.info("Deleted namespace :" + selected);
378        }
379      } catch (Exception e) {
380        LOG.warn("Caught exception in action: " + this.getClass());
381        throw e;
382      } finally {
383        admin.close();
384      }
385    }
386  }
387
388  private abstract class TableAction extends MasterAction {
389    // TableAction has implemented selectTable() shared by multiple table Actions
390    protected TableDescriptor selectTable(ConcurrentHashMap<TableName, TableDescriptor> tableMap) {
391      // synchronization to prevent removal from multiple threads
392      synchronized (tableMap) {
393        // randomly select table from tableMap
394        if (tableMap.isEmpty()) {
395          return null;
396        }
397        ArrayList<TableName> tableList = new ArrayList<>(tableMap.keySet());
398        TableName key = tableList.get(ThreadLocalRandom.current().nextInt(tableList.size()));
399        TableDescriptor randomTd = tableMap.remove(key);
400        return randomTd;
401      }
402    }
403  }
404
405  private class CreateTableAction extends TableAction {
406
407    @Override
408    void perform() throws IOException {
409      Admin admin = connection.getAdmin();
410      try {
411        TableDescriptor td = createTableDesc();
412        TableName tableName = td.getTableName();
413        if (admin.tableExists(tableName)) {
414          return;
415        }
416        String numRegionKey = String.format(NUM_REGIONS_KEY, this.getClass().getSimpleName());
417        numRegions = getConf().getInt(numRegionKey, DEFAULT_NUM_REGIONS);
418        byte[] startKey = Bytes.toBytes("row-0000000000");
419        byte[] endKey = Bytes.toBytes("row-" + Integer.MAX_VALUE);
420        LOG.info("Creating table:" + td);
421        admin.createTable(td, startKey, endKey, numRegions);
422        assertTrue(admin.tableExists(tableName), "Table: " + td + " was not created");
423        TableDescriptor freshTableDesc = admin.getDescriptor(tableName);
424        assertTrue(admin.isTableEnabled(tableName),
425          "After create, Table: " + tableName + " in not enabled");
426        enabledTables.put(tableName, freshTableDesc);
427        LOG.info("Created table:" + freshTableDesc);
428      } catch (Exception e) {
429        LOG.warn("Caught exception in action: " + this.getClass());
430        throw e;
431      } finally {
432        admin.close();
433      }
434    }
435
436    private TableDescriptor createTableDesc() {
437      String tableName =
438        String.format("ittable-%010d", ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE));
439      String familyName = "cf-" + ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE);
440      return TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))
441        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(familyName)).build();
442    }
443  }
444
445  private class DisableTableAction extends TableAction {
446
447    @Override
448    void perform() throws IOException {
449
450      TableDescriptor selected = selectTable(enabledTables);
451      if (selected == null) {
452        return;
453      }
454
455      Admin admin = connection.getAdmin();
456      try {
457        TableName tableName = selected.getTableName();
458        LOG.info("Disabling table :" + selected);
459        admin.disableTable(tableName);
460        assertTrue(admin.isTableDisabled(tableName), "Table: " + selected + " was not disabled");
461        TableDescriptor freshTableDesc = admin.getDescriptor(tableName);
462        assertTrue(admin.isTableDisabled(tableName),
463          "After disable, Table: " + tableName + " is not disabled");
464        disabledTables.put(tableName, freshTableDesc);
465        LOG.info("Disabled table :" + freshTableDesc);
466      } catch (Exception e) {
467        LOG.warn("Caught exception in action: " + this.getClass());
468        // TODO workaround
469        // loose restriction for TableNotDisabledException/TableNotEnabledException thrown in sync
470        // operations
471        // 1) when enable/disable starts, the table state is changed to ENABLING/DISABLING (ZK node
472        // in 1.x), which will be further changed to ENABLED/DISABLED once the operation completes
473        // 2) if master failover happens in the middle of the enable/disable operation, the new
474        // master will try to recover the tables in ENABLING/DISABLING state, as programmed in
475        // AssignmentManager#recoverTableInEnablingState() and
476        // AssignmentManager#recoverTableInDisablingState()
477        // 3) after the new master initialization completes, the procedure tries to re-do the
478        // enable/disable operation, which was already done. Ignore those exceptions before change
479        // of behaviors of AssignmentManager in presence of PV2
480        if (e instanceof TableNotEnabledException) {
481          LOG.warn("Caught TableNotEnabledException in action: " + this.getClass());
482          e.printStackTrace();
483        } else {
484          throw e;
485        }
486      } finally {
487        admin.close();
488      }
489    }
490  }
491
492  private class EnableTableAction extends TableAction {
493
494    @Override
495    void perform() throws IOException {
496
497      TableDescriptor selected = selectTable(disabledTables);
498      if (selected == null) {
499        return;
500      }
501
502      Admin admin = connection.getAdmin();
503      try {
504        TableName tableName = selected.getTableName();
505        LOG.info("Enabling table :" + selected);
506        admin.enableTable(tableName);
507        assertTrue(admin.isTableEnabled(tableName), "Table: " + selected + " was not enabled");
508        TableDescriptor freshTableDesc = admin.getDescriptor(tableName);
509        assertTrue(admin.isTableEnabled(tableName),
510          "After enable, Table: " + tableName + " in not enabled");
511        enabledTables.put(tableName, freshTableDesc);
512        LOG.info("Enabled table :" + freshTableDesc);
513      } catch (Exception e) {
514        LOG.warn("Caught exception in action: " + this.getClass());
515        // TODO workaround
516        // loose restriction for TableNotDisabledException/TableNotEnabledException thrown in sync
517        // operations 1) when enable/disable starts, the table state is changed to
518        // ENABLING/DISABLING (ZK node in 1.x), which will be further changed to ENABLED/DISABLED
519        // once the operation completes 2) if master failover happens in the middle of the
520        // enable/disable operation, the new master will try to recover the tables in
521        // ENABLING/DISABLING state, as programmed in
522        // AssignmentManager#recoverTableInEnablingState() and
523        // AssignmentManager#recoverTableInDisablingState()
524        // 3) after the new master initialization completes, the procedure tries to re-do the
525        // enable/disable operation, which was already done. Ignore those exceptions before
526        // change of behaviors of AssignmentManager in presence of PV2
527        if (e instanceof TableNotDisabledException) {
528          LOG.warn("Caught TableNotDisabledException in action: " + this.getClass());
529          e.printStackTrace();
530        } else {
531          throw e;
532        }
533      } finally {
534        admin.close();
535      }
536    }
537  }
538
539  private class DeleteTableAction extends TableAction {
540
541    @Override
542    void perform() throws IOException {
543
544      TableDescriptor selected = selectTable(disabledTables);
545      if (selected == null) {
546        return;
547      }
548
549      Admin admin = connection.getAdmin();
550      try {
551        TableName tableName = selected.getTableName();
552        LOG.info("Deleting table :" + selected);
553        admin.deleteTable(tableName);
554        assertFalse(admin.tableExists(tableName), "Table: " + selected + " was not deleted");
555        deletedTables.put(tableName, selected);
556        LOG.info("Deleted table :" + selected);
557      } catch (Exception e) {
558        LOG.warn("Caught exception in action: " + this.getClass());
559        throw e;
560      } finally {
561        admin.close();
562      }
563    }
564  }
565
566  private abstract class ColumnAction extends TableAction {
567    // ColumnAction has implemented selectFamily() shared by multiple family Actions
568    protected ColumnFamilyDescriptor selectFamily(TableDescriptor td) {
569      if (td == null) {
570        return null;
571      }
572      ColumnFamilyDescriptor[] families = td.getColumnFamilies();
573      if (families.length == 0) {
574        LOG.info("No column families in table: " + td);
575        return null;
576      }
577      return families[ThreadLocalRandom.current().nextInt(families.length)];
578    }
579  }
580
581  private class AddColumnFamilyAction extends ColumnAction {
582
583    @Override
584    void perform() throws IOException {
585      TableDescriptor selected = selectTable(disabledTables);
586      if (selected == null) {
587        return;
588      }
589
590      Admin admin = connection.getAdmin();
591      try {
592        ColumnFamilyDescriptor cfd = createFamilyDesc();
593        if (selected.hasColumnFamily(cfd.getName())) {
594          LOG.info(
595            Bytes.toString(cfd.getName()) + " already exists in table " + selected.getTableName());
596          return;
597        }
598        TableName tableName = selected.getTableName();
599        LOG.info("Adding column family: " + cfd + " to table: " + tableName);
600        admin.addColumnFamily(tableName, cfd);
601        // assertion
602        TableDescriptor freshTableDesc = admin.getDescriptor(tableName);
603        assertTrue(freshTableDesc.hasColumnFamily(cfd.getName()),
604          "Column family: " + cfd + " was not added");
605        assertTrue(admin.isTableDisabled(tableName),
606          "After add column family, Table: " + tableName + " is not disabled");
607        disabledTables.put(tableName, freshTableDesc);
608        LOG.info("Added column family: " + cfd + " to table: " + tableName);
609      } catch (Exception e) {
610        LOG.warn("Caught exception in action: " + this.getClass());
611        throw e;
612      } finally {
613        admin.close();
614      }
615    }
616
617    private ColumnFamilyDescriptor createFamilyDesc() {
618      String familyName = String.format("cf-%010d", ThreadLocalRandom.current().nextInt());
619      return ColumnFamilyDescriptorBuilder.of(familyName);
620    }
621  }
622
623  private class AlterFamilyVersionsAction extends ColumnAction {
624
625    @Override
626    void perform() throws IOException {
627      TableDescriptor selected = selectTable(disabledTables);
628      if (selected == null) {
629        return;
630      }
631      ColumnFamilyDescriptor columnDesc = selectFamily(selected);
632      if (columnDesc == null) {
633        return;
634      }
635
636      Admin admin = connection.getAdmin();
637      int versions = ThreadLocalRandom.current().nextInt(10) + 3;
638      try {
639        TableName tableName = selected.getTableName();
640        LOG.info("Altering versions of column family: " + columnDesc + " to: " + versions
641          + " in table: " + tableName);
642
643        ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(columnDesc)
644          .setMinVersions(versions).setMaxVersions(versions).build();
645        TableDescriptor td =
646          TableDescriptorBuilder.newBuilder(selected).modifyColumnFamily(cfd).build();
647        admin.modifyTable(td);
648
649        // assertion
650        TableDescriptor freshTableDesc = admin.getDescriptor(tableName);
651        ColumnFamilyDescriptor freshColumnDesc =
652          freshTableDesc.getColumnFamily(columnDesc.getName());
653        assertEquals(freshColumnDesc.getMaxVersions(), versions,
654          "Column family: " + columnDesc + " was not altered");
655        assertEquals(freshColumnDesc.getMinVersions(), versions,
656          "Column family: " + freshColumnDesc + " was not altered");
657        assertTrue(admin.isTableDisabled(tableName),
658          "After alter versions of column family, Table: " + tableName + " is not disabled");
659        disabledTables.put(tableName, freshTableDesc);
660        LOG.info("Altered versions of column family: " + columnDesc + " to: " + versions
661          + " in table: " + tableName);
662      } catch (Exception e) {
663        LOG.warn("Caught exception in action: " + this.getClass());
664        throw e;
665      } finally {
666        admin.close();
667      }
668    }
669  }
670
671  private class AlterFamilyEncodingAction extends ColumnAction {
672
673    @Override
674    void perform() throws IOException {
675      TableDescriptor selected = selectTable(disabledTables);
676      if (selected == null) {
677        return;
678      }
679      ColumnFamilyDescriptor columnDesc = selectFamily(selected);
680      if (columnDesc == null) {
681        return;
682      }
683
684      Admin admin = connection.getAdmin();
685      try {
686        TableName tableName = selected.getTableName();
687        // possible DataBlockEncoding ids
688        DataBlockEncoding[] possibleIds = { DataBlockEncoding.NONE, DataBlockEncoding.PREFIX,
689          DataBlockEncoding.DIFF, DataBlockEncoding.FAST_DIFF, DataBlockEncoding.ROW_INDEX_V1 };
690        short id = possibleIds[ThreadLocalRandom.current().nextInt(possibleIds.length)].getId();
691        LOG.info("Altering encoding of column family: " + columnDesc + " to: " + id + " in table: "
692          + tableName);
693
694        ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(columnDesc)
695          .setDataBlockEncoding(DataBlockEncoding.getEncodingById(id)).build();
696        TableDescriptor td =
697          TableDescriptorBuilder.newBuilder(selected).modifyColumnFamily(cfd).build();
698        admin.modifyTable(td);
699
700        // assertion
701        TableDescriptor freshTableDesc = admin.getDescriptor(tableName);
702        ColumnFamilyDescriptor freshColumnDesc =
703          freshTableDesc.getColumnFamily(columnDesc.getName());
704        assertEquals(freshColumnDesc.getDataBlockEncoding().getId(), id,
705          "Encoding of column family: " + columnDesc + " was not altered");
706        assertTrue(admin.isTableDisabled(tableName),
707          "After alter encoding of column family, Table: " + tableName + " is not disabled");
708        disabledTables.put(tableName, freshTableDesc);
709        LOG.info("Altered encoding of column family: " + freshColumnDesc + " to: " + id
710          + " in table: " + tableName);
711      } catch (Exception e) {
712        LOG.warn("Caught exception in action: " + this.getClass());
713        throw e;
714      } finally {
715        admin.close();
716      }
717    }
718  }
719
720  private class DeleteColumnFamilyAction extends ColumnAction {
721
722    @Override
723    void perform() throws IOException {
724      TableDescriptor selected = selectTable(disabledTables);
725      ColumnFamilyDescriptor cfd = selectFamily(selected);
726      if (selected == null || cfd == null) {
727        return;
728      }
729
730      Admin admin = connection.getAdmin();
731      try {
732        if (selected.getColumnFamilyCount() < 2) {
733          LOG.info("No enough column families to delete in table " + selected.getTableName());
734          return;
735        }
736        TableName tableName = selected.getTableName();
737        LOG.info("Deleting column family: " + cfd + " from table: " + tableName);
738        admin.deleteColumnFamily(tableName, cfd.getName());
739        // assertion
740        TableDescriptor freshTableDesc = admin.getDescriptor(tableName);
741        assertFalse(freshTableDesc.hasColumnFamily(cfd.getName()),
742          "Column family: " + cfd + " was not added");
743        assertTrue(admin.isTableDisabled(tableName),
744          "After delete column family, Table: " + tableName + " is not disabled");
745        disabledTables.put(tableName, freshTableDesc);
746        LOG.info("Deleted column family: " + cfd + " from table: " + tableName);
747      } catch (Exception e) {
748        LOG.warn("Caught exception in action: " + this.getClass());
749        throw e;
750      } finally {
751        admin.close();
752      }
753    }
754  }
755
756  private class AddRowAction extends ColumnAction {
757    // populate tables
758    @Override
759    void perform() throws IOException {
760      TableDescriptor selected = selectTable(enabledTables);
761      if (selected == null) {
762        return;
763      }
764
765      Admin admin = connection.getAdmin();
766      TableName tableName = selected.getTableName();
767      try (Table table = connection.getTable(tableName)) {
768        ArrayList<RegionInfo> regionInfos =
769          new ArrayList<>(admin.getRegions(selected.getTableName()));
770        int numRegions = regionInfos.size();
771        // average number of rows to be added per action to each region
772        int average_rows = 1;
773        int numRows = average_rows * numRegions;
774        LOG.info("Adding " + numRows + " rows to table: " + selected);
775        byte[] value = new byte[10];
776        for (int i = 0; i < numRows; i++) {
777          // nextInt(Integer.MAX_VALUE)) to return positive numbers only
778          byte[] rowKey =
779            Bytes.toBytes("row-" + String.format("%010d", ThreadLocalRandom.current().nextInt()));
780          ColumnFamilyDescriptor cfd = selectFamily(selected);
781          if (cfd == null) {
782            return;
783          }
784          byte[] family = cfd.getName();
785          byte[] qualifier = Bytes.toBytes("col-" + ThreadLocalRandom.current().nextInt(10));
786          Bytes.random(value);
787          Put put = new Put(rowKey);
788          put.addColumn(family, qualifier, value);
789          table.put(put);
790        }
791        TableDescriptor freshTableDesc = admin.getDescriptor(tableName);
792        assertTrue(admin.isTableEnabled(tableName),
793          "After insert, Table: " + tableName + " in not enabled");
794        enabledTables.put(tableName, freshTableDesc);
795        LOG.info("Added " + numRows + " rows to table: " + selected);
796      } catch (Exception e) {
797        LOG.warn("Caught exception in action: " + this.getClass());
798        throw e;
799      } finally {
800        admin.close();
801      }
802    }
803  }
804
805  private enum ACTION {
806    CREATE_NAMESPACE,
807    MODIFY_NAMESPACE,
808    DELETE_NAMESPACE,
809    CREATE_TABLE,
810    DISABLE_TABLE,
811    ENABLE_TABLE,
812    DELETE_TABLE,
813    ADD_COLUMNFAMILY,
814    DELETE_COLUMNFAMILY,
815    ALTER_FAMILYVERSIONS,
816    ALTER_FAMILYENCODING,
817    ADD_ROW
818  }
819
820  private class Worker extends Thread {
821
822    private Exception savedException;
823
824    private ACTION action;
825
826    @Override
827    public void run() {
828      while (running.get()) {
829        // select random action
830        ACTION selectedAction =
831          ACTION.values()[ThreadLocalRandom.current().nextInt(ACTION.values().length)];
832        this.action = selectedAction;
833        LOG.info("Performing Action: " + selectedAction);
834
835        try {
836          switch (selectedAction) {
837            case CREATE_NAMESPACE:
838              new CreateNamespaceAction().perform();
839              break;
840            case MODIFY_NAMESPACE:
841              new ModifyNamespaceAction().perform();
842              break;
843            case DELETE_NAMESPACE:
844              new DeleteNamespaceAction().perform();
845              break;
846            case CREATE_TABLE:
847              // stop creating new tables in the later stage of the test to avoid too many empty
848              // tables
849              if (create_table.get()) {
850                new CreateTableAction().perform();
851              }
852              break;
853            case ADD_ROW:
854              new AddRowAction().perform();
855              break;
856            case DISABLE_TABLE:
857              new DisableTableAction().perform();
858              break;
859            case ENABLE_TABLE:
860              new EnableTableAction().perform();
861              break;
862            case DELETE_TABLE:
863              // reduce probability of deleting table to 20%
864              if (ThreadLocalRandom.current().nextInt(100) < 20) {
865                new DeleteTableAction().perform();
866              }
867              break;
868            case ADD_COLUMNFAMILY:
869              new AddColumnFamilyAction().perform();
870              break;
871            case DELETE_COLUMNFAMILY:
872              // reduce probability of deleting column family to 20%
873              if (ThreadLocalRandom.current().nextInt(100) < 20) {
874                new DeleteColumnFamilyAction().perform();
875              }
876              break;
877            case ALTER_FAMILYVERSIONS:
878              new AlterFamilyVersionsAction().perform();
879              break;
880            case ALTER_FAMILYENCODING:
881              new AlterFamilyEncodingAction().perform();
882              break;
883          }
884        } catch (Exception ex) {
885          this.savedException = ex;
886          return;
887        }
888      }
889      LOG.info(this.getName() + " stopped");
890    }
891
892    public Exception getSavedException() {
893      return this.savedException;
894    }
895
896    public ACTION getAction() {
897      return this.action;
898    }
899  }
900
901  private void checkException(List<Worker> workers) {
902    if (workers == null || workers.isEmpty()) return;
903    for (Worker worker : workers) {
904      Exception e = worker.getSavedException();
905      if (e != null) {
906        LOG.error("Found exception in thread: " + worker.getName(), e);
907      }
908      assertNull(e, "Action failed: " + worker.getAction() + " in thread: " + worker.getName());
909    }
910  }
911
912  private int runTest() throws Exception {
913    LOG.info("Starting the test");
914
915    String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
916    long runtime = util.getConfiguration().getLong(runtimeKey, DEFAULT_RUN_TIME);
917
918    String numThreadKey = String.format(NUM_THREADS_KEY, this.getClass().getSimpleName());
919    numThreads = util.getConfiguration().getInt(numThreadKey, DEFAULT_NUM_THREADS);
920
921    ArrayList<Worker> workers = new ArrayList<>(numThreads);
922    for (int i = 0; i < numThreads; i++) {
923      checkException(workers);
924      Worker worker = new Worker();
925      LOG.info("Launching worker thread " + worker.getName());
926      workers.add(worker);
927      worker.start();
928    }
929
930    Threads.sleep(runtime / 2);
931    LOG.info("Stopping creating new tables");
932    create_table.set(false);
933    Threads.sleep(runtime / 2);
934    LOG.info("Runtime is up");
935    running.set(false);
936
937    checkException(workers);
938
939    for (Worker worker : workers) {
940      worker.join();
941    }
942    LOG.info("All Worker threads stopped");
943
944    // verify
945    LOG.info("Verify actions of all threads succeeded");
946    checkException(workers);
947    LOG.info("Verify namespaces");
948    verifyNamespaces();
949    LOG.info("Verify states of all tables");
950    verifyTables();
951
952    // RUN HBCK
953
954    HBaseFsck hbck = null;
955    try {
956      LOG.info("Running hbck");
957      hbck = HbckTestingUtil.doFsck(util.getConfiguration(), false);
958      if (HbckTestingUtil.inconsistencyFound(hbck)) {
959        // Find the inconsistency during HBCK. Leave table and namespace undropped so that
960        // we can check outside the test.
961        keepObjectsAtTheEnd = true;
962      }
963      HbckTestingUtil.assertNoErrors(hbck);
964      LOG.info("Finished hbck");
965    } finally {
966      if (hbck != null) {
967        hbck.close();
968      }
969    }
970    return 0;
971  }
972
973  @Override
974  public TableName getTablename() {
975    return null; // This test is not inteded to run with stock Chaos Monkey
976  }
977
978  @Override
979  protected Set<String> getColumnFamilies() {
980    return null; // This test is not inteded to run with stock Chaos Monkey
981  }
982
983  public static void main(String[] args) throws Exception {
984    Configuration conf = HBaseConfiguration.create();
985    IntegrationTestingUtility.setUseDistributedCluster(conf);
986    IntegrationTestDDLMasterFailover masterFailover = new IntegrationTestDDLMasterFailover();
987    Connection connection = null;
988    int ret = 1;
989    try {
990      // Initialize connection once, then pass to Actions
991      LOG.debug("Setting up connection ...");
992      connection = ConnectionFactory.createConnection(conf);
993      masterFailover.setConnection(connection);
994      ret = ToolRunner.run(conf, masterFailover, args);
995    } catch (IOException e) {
996      LOG.error(HBaseMarkers.FATAL, "Failed to establish connection. Aborting test ...", e);
997    } finally {
998      connection = masterFailover.getConnection();
999      if (connection != null) {
1000        connection.close();
1001      }
1002      System.exit(ret);
1003    }
1004  }
1005}