001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.Set;
024import java.util.concurrent.ConcurrentHashMap;
025import java.util.concurrent.ThreadLocalRandom;
026import java.util.concurrent.atomic.AtomicBoolean;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.client.Admin;
029import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
030import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
031import org.apache.hadoop.hbase.client.Connection;
032import org.apache.hadoop.hbase.client.ConnectionFactory;
033import org.apache.hadoop.hbase.client.Put;
034import org.apache.hadoop.hbase.client.Table;
035import org.apache.hadoop.hbase.client.TableDescriptor;
036import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
037import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
038import org.apache.hadoop.hbase.log.HBaseMarkers;
039import org.apache.hadoop.hbase.testclassification.IntegrationTests;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.hadoop.hbase.util.HBaseFsck;
042import org.apache.hadoop.hbase.util.Threads;
043import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil;
044import org.apache.hadoop.util.ToolRunner;
045import org.junit.Assert;
046import org.junit.Test;
047import org.junit.experimental.categories.Category;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051/**
052 * Integration test that verifies Procedure V2. DDL operations should go through (rollforward or
053 * rollback) when primary master is killed by ChaosMonkey (default MASTER_KILLING).
054 * <p>
055 * </p>
056 * Multiple Worker threads are started to randomly do the following Actions in loops: Actions
057 * generating and populating tables:
058 * <ul>
059 * <li>CreateTableAction</li>
060 * <li>DisableTableAction</li>
061 * <li>EnableTableAction</li>
062 * <li>DeleteTableAction</li>
063 * <li>AddRowAction</li>
064 * </ul>
065 * Actions performing column family DDL operations:
066 * <ul>
067 * <li>AddColumnFamilyAction</li>
068 * <li>AlterColumnFamilyVersionsAction</li>
069 * <li>AlterColumnFamilyEncodingAction</li>
070 * <li>DeleteColumnFamilyAction</li>
071 * </ul>
072 * Actions performing namespace DDL operations:
073 * <ul>
074 * <li>AddNamespaceAction</li>
075 * <li>AlterNamespaceAction</li>
076 * <li>DeleteNamespaceAction</li>
077 * </ul>
078 * <br/>
079 * The threads run for a period of time (default 20 minutes) then are stopped at the end of runtime.
080 * Verification is performed towards those checkpoints:
081 * <ol>
082 * <li>No Actions throw Exceptions.</li>
083 * <li>No inconsistencies are detected in hbck.</li>
084 * </ol>
085 * <p>
086 * This test should be run by the hbase user since it invokes hbck at the end
087 * </p>
088 * <p>
089 * Usage: hbase org.apache.hadoop.hbase.IntegrationTestDDLMasterFailover
090 * -Dhbase.IntegrationTestDDLMasterFailover.runtime=1200000
091 * -Dhbase.IntegrationTestDDLMasterFailover.numThreads=20
092 * -Dhbase.IntegrationTestDDLMasterFailover.numRegions=50 --monkey masterKilling
093 */
094
095@Category(IntegrationTests.class)
096public class IntegrationTestDDLMasterFailover extends IntegrationTestBase {
097
098  private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestDDLMasterFailover.class);
099
100  private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster
101
102  protected static final long DEFAULT_RUN_TIME = 20 * 60 * 1000;
103
104  protected static final int DEFAULT_NUM_THREADS = 20;
105
106  protected static final int DEFAULT_NUM_REGIONS = 50; // number of regions in pre-split tables
107
108  private boolean keepObjectsAtTheEnd = false;
109  protected HBaseCluster cluster;
110
111  protected Connection connection;
112
113  /**
114   * A soft limit on how long we should run
115   */
116  protected static final String RUN_TIME_KEY = "hbase.%s.runtime";
117  protected static final String NUM_THREADS_KEY = "hbase.%s.numThreads";
118  protected static final String NUM_REGIONS_KEY = "hbase.%s.numRegions";
119
120  protected AtomicBoolean running = new AtomicBoolean(true);
121
122  protected AtomicBoolean create_table = new AtomicBoolean(true);
123
124  protected int numThreads, numRegions;
125
126  ConcurrentHashMap<String, NamespaceDescriptor> namespaceMap = new ConcurrentHashMap<>();
127
128  ConcurrentHashMap<TableName, TableDescriptor> enabledTables = new ConcurrentHashMap<>();
129
130  ConcurrentHashMap<TableName, TableDescriptor> disabledTables = new ConcurrentHashMap<>();
131
132  ConcurrentHashMap<TableName, TableDescriptor> deletedTables = new ConcurrentHashMap<>();
133
134  @Override
135  public void setUpCluster() throws Exception {
136    util = getTestingUtil(getConf());
137    LOG.debug("Initializing/checking cluster has " + SERVER_COUNT + " servers");
138    util.initializeCluster(getMinServerCount());
139    LOG.debug("Done initializing/checking cluster");
140    cluster = util.getHBaseClusterInterface();
141  }
142
143  @Override
144  public void cleanUpCluster() throws Exception {
145    if (!keepObjectsAtTheEnd) {
146      Admin admin = util.getAdmin();
147      admin.disableTables("ittable-\\d+");
148      admin.deleteTables("ittable-\\d+");
149      NamespaceDescriptor[] nsds = admin.listNamespaceDescriptors();
150      for (NamespaceDescriptor nsd : nsds) {
151        if (nsd.getName().matches("itnamespace\\d+")) {
152          LOG.info("Removing namespace=" + nsd.getName());
153          admin.deleteNamespace(nsd.getName());
154        }
155      }
156    }
157
158    enabledTables.clear();
159    disabledTables.clear();
160    deletedTables.clear();
161    namespaceMap.clear();
162
163    Connection connection = getConnection();
164    connection.close();
165    super.cleanUpCluster();
166  }
167
168  protected int getMinServerCount() {
169    return SERVER_COUNT;
170  }
171
172  protected synchronized void setConnection(Connection connection) {
173    this.connection = connection;
174  }
175
176  protected synchronized Connection getConnection() {
177    if (this.connection == null) {
178      try {
179        Connection connection = ConnectionFactory.createConnection(getConf());
180        setConnection(connection);
181      } catch (IOException e) {
182        LOG.error(HBaseMarkers.FATAL, "Failed to establish connection.", e);
183      }
184    }
185    return connection;
186  }
187
188  protected void verifyNamespaces() throws IOException {
189    Connection connection = getConnection();
190    Admin admin = connection.getAdmin();
191    // iterating concurrent map
192    for (String nsName : namespaceMap.keySet()) {
193      try {
194        Assert.assertTrue("Namespace: " + nsName + " in namespaceMap does not exist",
195          admin.getNamespaceDescriptor(nsName) != null);
196      } catch (NamespaceNotFoundException nsnfe) {
197        Assert
198          .fail("Namespace: " + nsName + " in namespaceMap does not exist: " + nsnfe.getMessage());
199      }
200    }
201    admin.close();
202  }
203
204  protected void verifyTables() throws IOException {
205    Connection connection = getConnection();
206    Admin admin = connection.getAdmin();
207    // iterating concurrent map
208    for (TableName tableName : enabledTables.keySet()) {
209      Assert.assertTrue("Table: " + tableName + " in enabledTables is not enabled",
210        admin.isTableEnabled(tableName));
211    }
212    for (TableName tableName : disabledTables.keySet()) {
213      Assert.assertTrue("Table: " + tableName + " in disabledTables is not disabled",
214        admin.isTableDisabled(tableName));
215    }
216    for (TableName tableName : deletedTables.keySet()) {
217      Assert.assertFalse("Table: " + tableName + " in deletedTables is not deleted",
218        admin.tableExists(tableName));
219    }
220    admin.close();
221  }
222
223  @Test
224  public void testAsUnitTest() throws Exception {
225    runTest();
226  }
227
228  @Override
229  public int runTestFromCommandLine() throws Exception {
230    int ret = runTest();
231    return ret;
232  }
233
234  private abstract class MasterAction {
235    Connection connection = getConnection();
236
237    abstract void perform() throws IOException;
238  }
239
240  private abstract class NamespaceAction extends MasterAction {
241    final String nsTestConfigKey = "hbase.namespace.testKey";
242
243    // NamespaceAction has implemented selectNamespace() shared by multiple namespace Actions
244    protected NamespaceDescriptor
245      selectNamespace(ConcurrentHashMap<String, NamespaceDescriptor> namespaceMap) {
246      // synchronization to prevent removal from multiple threads
247      synchronized (namespaceMap) {
248        // randomly select namespace from namespaceMap
249        if (namespaceMap.isEmpty()) {
250          return null;
251        }
252        ArrayList<String> namespaceList = new ArrayList<>(namespaceMap.keySet());
253        String randomKey =
254          namespaceList.get(ThreadLocalRandom.current().nextInt(namespaceList.size()));
255        NamespaceDescriptor randomNsd = namespaceMap.get(randomKey);
256        // remove from namespaceMap
257        namespaceMap.remove(randomKey);
258        return randomNsd;
259      }
260    }
261  }
262
263  private class CreateNamespaceAction extends NamespaceAction {
264    @Override
265    void perform() throws IOException {
266      Admin admin = connection.getAdmin();
267      try {
268        NamespaceDescriptor nsd;
269        while (true) {
270          nsd = createNamespaceDesc();
271          try {
272            if (admin.getNamespaceDescriptor(nsd.getName()) != null) {
273              // the namespace has already existed.
274              continue;
275            } else {
276              // currently, the code never return null - always throws exception if
277              // namespace is not found - this just a defensive programming to make
278              // sure null situation is handled in case the method changes in the
279              // future.
280              break;
281            }
282          } catch (NamespaceNotFoundException nsnfe) {
283            // This is expected for a random generated NamespaceDescriptor
284            break;
285          }
286        }
287        LOG.info("Creating namespace:" + nsd);
288        admin.createNamespace(nsd);
289        NamespaceDescriptor freshNamespaceDesc = admin.getNamespaceDescriptor(nsd.getName());
290        Assert.assertTrue("Namespace: " + nsd + " was not created", freshNamespaceDesc != null);
291        namespaceMap.put(nsd.getName(), freshNamespaceDesc);
292        LOG.info("Created namespace:" + freshNamespaceDesc);
293      } catch (Exception e) {
294        LOG.warn("Caught exception in action: " + this.getClass());
295        throw e;
296      } finally {
297        admin.close();
298      }
299    }
300
301    private NamespaceDescriptor createNamespaceDesc() {
302      String namespaceName =
303        "itnamespace" + String.format("%010d", ThreadLocalRandom.current().nextInt());
304      NamespaceDescriptor nsd = NamespaceDescriptor.create(namespaceName).build();
305
306      nsd.setConfiguration(nsTestConfigKey,
307        String.format("%010d", ThreadLocalRandom.current().nextInt()));
308      return nsd;
309    }
310  }
311
312  private class ModifyNamespaceAction extends NamespaceAction {
313    @Override
314    void perform() throws IOException {
315      NamespaceDescriptor selected = selectNamespace(namespaceMap);
316      if (selected == null) {
317        return;
318      }
319
320      Admin admin = connection.getAdmin();
321      try {
322        String namespaceName = selected.getName();
323        LOG.info("Modifying namespace :" + selected);
324        NamespaceDescriptor modifiedNsd = NamespaceDescriptor.create(namespaceName).build();
325        String nsValueNew;
326        do {
327          nsValueNew = String.format("%010d", ThreadLocalRandom.current().nextInt());
328        } while (selected.getConfigurationValue(nsTestConfigKey).equals(nsValueNew));
329        modifiedNsd.setConfiguration(nsTestConfigKey, nsValueNew);
330        admin.modifyNamespace(modifiedNsd);
331        NamespaceDescriptor freshNamespaceDesc = admin.getNamespaceDescriptor(namespaceName);
332        Assert.assertTrue("Namespace: " + selected + " was not modified",
333          freshNamespaceDesc.getConfigurationValue(nsTestConfigKey).equals(nsValueNew));
334        Assert.assertTrue("Namespace: " + namespaceName + " does not exist",
335          admin.getNamespaceDescriptor(namespaceName) != null);
336        namespaceMap.put(namespaceName, freshNamespaceDesc);
337        LOG.info("Modified namespace :" + freshNamespaceDesc);
338      } catch (Exception e) {
339        LOG.warn("Caught exception in action: " + this.getClass());
340        throw e;
341      } finally {
342        admin.close();
343      }
344    }
345  }
346
347  private class DeleteNamespaceAction extends NamespaceAction {
348    @Override
349    void perform() throws IOException {
350      NamespaceDescriptor selected = selectNamespace(namespaceMap);
351      if (selected == null) {
352        return;
353      }
354
355      Admin admin = connection.getAdmin();
356      try {
357        String namespaceName = selected.getName();
358        LOG.info("Deleting namespace :" + selected);
359        admin.deleteNamespace(namespaceName);
360        try {
361          if (admin.getNamespaceDescriptor(namespaceName) != null) {
362            // the namespace still exists.
363            Assert.assertTrue("Namespace: " + selected + " was not deleted", false);
364          } else {
365            LOG.info("Deleted namespace :" + selected);
366          }
367        } catch (NamespaceNotFoundException nsnfe) {
368          // This is expected result
369          LOG.info("Deleted namespace :" + selected);
370        }
371      } catch (Exception e) {
372        LOG.warn("Caught exception in action: " + this.getClass());
373        throw e;
374      } finally {
375        admin.close();
376      }
377    }
378  }
379
380  private abstract class TableAction extends MasterAction {
381    // TableAction has implemented selectTable() shared by multiple table Actions
382    protected TableDescriptor selectTable(ConcurrentHashMap<TableName, TableDescriptor> tableMap) {
383      // synchronization to prevent removal from multiple threads
384      synchronized (tableMap) {
385        // randomly select table from tableMap
386        if (tableMap.isEmpty()) {
387          return null;
388        }
389        ArrayList<TableName> tableList = new ArrayList<>(tableMap.keySet());
390        TableName key = tableList.get(ThreadLocalRandom.current().nextInt(tableList.size()));
391        TableDescriptor randomTd = tableMap.remove(key);
392        return randomTd;
393      }
394    }
395  }
396
397  private class CreateTableAction extends TableAction {
398
399    @Override
400    void perform() throws IOException {
401      Admin admin = connection.getAdmin();
402      try {
403        TableDescriptor td = createTableDesc();
404        TableName tableName = td.getTableName();
405        if (admin.tableExists(tableName)) {
406          return;
407        }
408        String numRegionKey = String.format(NUM_REGIONS_KEY, this.getClass().getSimpleName());
409        numRegions = getConf().getInt(numRegionKey, DEFAULT_NUM_REGIONS);
410        byte[] startKey = Bytes.toBytes("row-0000000000");
411        byte[] endKey = Bytes.toBytes("row-" + Integer.MAX_VALUE);
412        LOG.info("Creating table:" + td);
413        admin.createTable(td, startKey, endKey, numRegions);
414        Assert.assertTrue("Table: " + td + " was not created", admin.tableExists(tableName));
415        TableDescriptor freshTableDesc = admin.getDescriptor(tableName);
416        Assert.assertTrue("After create, Table: " + tableName + " in not enabled",
417          admin.isTableEnabled(tableName));
418        enabledTables.put(tableName, freshTableDesc);
419        LOG.info("Created table:" + freshTableDesc);
420      } catch (Exception e) {
421        LOG.warn("Caught exception in action: " + this.getClass());
422        throw e;
423      } finally {
424        admin.close();
425      }
426    }
427
428    private TableDescriptor createTableDesc() {
429      String tableName =
430        String.format("ittable-%010d", ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE));
431      String familyName = "cf-" + ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE);
432      return TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))
433        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(familyName)).build();
434    }
435  }
436
437  private class DisableTableAction extends TableAction {
438
439    @Override
440    void perform() throws IOException {
441
442      TableDescriptor selected = selectTable(enabledTables);
443      if (selected == null) {
444        return;
445      }
446
447      Admin admin = connection.getAdmin();
448      try {
449        TableName tableName = selected.getTableName();
450        LOG.info("Disabling table :" + selected);
451        admin.disableTable(tableName);
452        Assert.assertTrue("Table: " + selected + " was not disabled",
453          admin.isTableDisabled(tableName));
454        TableDescriptor freshTableDesc = admin.getDescriptor(tableName);
455        Assert.assertTrue("After disable, Table: " + tableName + " is not disabled",
456          admin.isTableDisabled(tableName));
457        disabledTables.put(tableName, freshTableDesc);
458        LOG.info("Disabled table :" + freshTableDesc);
459      } catch (Exception e) {
460        LOG.warn("Caught exception in action: " + this.getClass());
461        // TODO workaround
462        // loose restriction for TableNotDisabledException/TableNotEnabledException thrown in sync
463        // operations
464        // 1) when enable/disable starts, the table state is changed to ENABLING/DISABLING (ZK node
465        // in 1.x), which will be further changed to ENABLED/DISABLED once the operation completes
466        // 2) if master failover happens in the middle of the enable/disable operation, the new
467        // master will try to recover the tables in ENABLING/DISABLING state, as programmed in
468        // AssignmentManager#recoverTableInEnablingState() and
469        // AssignmentManager#recoverTableInDisablingState()
470        // 3) after the new master initialization completes, the procedure tries to re-do the
471        // enable/disable operation, which was already done. Ignore those exceptions before change
472        // of behaviors of AssignmentManager in presence of PV2
473        if (e instanceof TableNotEnabledException) {
474          LOG.warn("Caught TableNotEnabledException in action: " + this.getClass());
475          e.printStackTrace();
476        } else {
477          throw e;
478        }
479      } finally {
480        admin.close();
481      }
482    }
483  }
484
485  private class EnableTableAction extends TableAction {
486
487    @Override
488    void perform() throws IOException {
489
490      TableDescriptor selected = selectTable(disabledTables);
491      if (selected == null) {
492        return;
493      }
494
495      Admin admin = connection.getAdmin();
496      try {
497        TableName tableName = selected.getTableName();
498        LOG.info("Enabling table :" + selected);
499        admin.enableTable(tableName);
500        Assert.assertTrue("Table: " + selected + " was not enabled",
501          admin.isTableEnabled(tableName));
502        TableDescriptor freshTableDesc = admin.getDescriptor(tableName);
503        Assert.assertTrue("After enable, Table: " + tableName + " in not enabled",
504          admin.isTableEnabled(tableName));
505        enabledTables.put(tableName, freshTableDesc);
506        LOG.info("Enabled table :" + freshTableDesc);
507      } catch (Exception e) {
508        LOG.warn("Caught exception in action: " + this.getClass());
509        // TODO workaround
510        // loose restriction for TableNotDisabledException/TableNotEnabledException thrown in sync
511        // operations 1) when enable/disable starts, the table state is changed to
512        // ENABLING/DISABLING (ZK node in 1.x), which will be further changed to ENABLED/DISABLED
513        // once the operation completes 2) if master failover happens in the middle of the
514        // enable/disable operation, the new master will try to recover the tables in
515        // ENABLING/DISABLING state, as programmed in
516        // AssignmentManager#recoverTableInEnablingState() and
517        // AssignmentManager#recoverTableInDisablingState()
518        // 3) after the new master initialization completes, the procedure tries to re-do the
519        // enable/disable operation, which was already done. Ignore those exceptions before
520        // change of behaviors of AssignmentManager in presence of PV2
521        if (e instanceof TableNotDisabledException) {
522          LOG.warn("Caught TableNotDisabledException in action: " + this.getClass());
523          e.printStackTrace();
524        } else {
525          throw e;
526        }
527      } finally {
528        admin.close();
529      }
530    }
531  }
532
533  private class DeleteTableAction extends TableAction {
534
535    @Override
536    void perform() throws IOException {
537
538      TableDescriptor selected = selectTable(disabledTables);
539      if (selected == null) {
540        return;
541      }
542
543      Admin admin = connection.getAdmin();
544      try {
545        TableName tableName = selected.getTableName();
546        LOG.info("Deleting table :" + selected);
547        admin.deleteTable(tableName);
548        Assert.assertFalse("Table: " + selected + " was not deleted", admin.tableExists(tableName));
549        deletedTables.put(tableName, selected);
550        LOG.info("Deleted table :" + selected);
551      } catch (Exception e) {
552        LOG.warn("Caught exception in action: " + this.getClass());
553        throw e;
554      } finally {
555        admin.close();
556      }
557    }
558  }
559
560  private abstract class ColumnAction extends TableAction {
561    // ColumnAction has implemented selectFamily() shared by multiple family Actions
562    protected ColumnFamilyDescriptor selectFamily(TableDescriptor td) {
563      if (td == null) {
564        return null;
565      }
566      ColumnFamilyDescriptor[] families = td.getColumnFamilies();
567      if (families.length == 0) {
568        LOG.info("No column families in table: " + td);
569        return null;
570      }
571      return families[ThreadLocalRandom.current().nextInt(families.length)];
572    }
573  }
574
575  private class AddColumnFamilyAction extends ColumnAction {
576
577    @Override
578    void perform() throws IOException {
579      TableDescriptor selected = selectTable(disabledTables);
580      if (selected == null) {
581        return;
582      }
583
584      Admin admin = connection.getAdmin();
585      try {
586        ColumnFamilyDescriptor cfd = createFamilyDesc();
587        if (selected.hasColumnFamily(cfd.getName())) {
588          LOG.info(
589            Bytes.toString(cfd.getName()) + " already exists in table " + selected.getTableName());
590          return;
591        }
592        TableName tableName = selected.getTableName();
593        LOG.info("Adding column family: " + cfd + " to table: " + tableName);
594        admin.addColumnFamily(tableName, cfd);
595        // assertion
596        TableDescriptor freshTableDesc = admin.getDescriptor(tableName);
597        Assert.assertTrue("Column family: " + cfd + " was not added",
598          freshTableDesc.hasColumnFamily(cfd.getName()));
599        Assert.assertTrue("After add column family, Table: " + tableName + " is not disabled",
600          admin.isTableDisabled(tableName));
601        disabledTables.put(tableName, freshTableDesc);
602        LOG.info("Added column family: " + cfd + " to table: " + tableName);
603      } catch (Exception e) {
604        LOG.warn("Caught exception in action: " + this.getClass());
605        throw e;
606      } finally {
607        admin.close();
608      }
609    }
610
611    private ColumnFamilyDescriptor createFamilyDesc() {
612      String familyName = String.format("cf-%010d", ThreadLocalRandom.current().nextInt());
613      return ColumnFamilyDescriptorBuilder.of(familyName);
614    }
615  }
616
617  private class AlterFamilyVersionsAction extends ColumnAction {
618
619    @Override
620    void perform() throws IOException {
621      TableDescriptor selected = selectTable(disabledTables);
622      if (selected == null) {
623        return;
624      }
625      ColumnFamilyDescriptor columnDesc = selectFamily(selected);
626      if (columnDesc == null) {
627        return;
628      }
629
630      Admin admin = connection.getAdmin();
631      int versions = ThreadLocalRandom.current().nextInt(10) + 3;
632      try {
633        TableName tableName = selected.getTableName();
634        LOG.info("Altering versions of column family: " + columnDesc + " to: " + versions
635          + " in table: " + tableName);
636
637        ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(columnDesc)
638          .setMinVersions(versions).setMaxVersions(versions).build();
639        TableDescriptor td =
640          TableDescriptorBuilder.newBuilder(selected).modifyColumnFamily(cfd).build();
641        admin.modifyTable(td);
642
643        // assertion
644        TableDescriptor freshTableDesc = admin.getDescriptor(tableName);
645        ColumnFamilyDescriptor freshColumnDesc =
646          freshTableDesc.getColumnFamily(columnDesc.getName());
647        Assert.assertEquals("Column family: " + columnDesc + " was not altered",
648          freshColumnDesc.getMaxVersions(), versions);
649        Assert.assertEquals("Column family: " + freshColumnDesc + " was not altered",
650          freshColumnDesc.getMinVersions(), versions);
651        Assert.assertTrue(
652          "After alter versions of column family, Table: " + tableName + " is not disabled",
653          admin.isTableDisabled(tableName));
654        disabledTables.put(tableName, freshTableDesc);
655        LOG.info("Altered versions of column family: " + columnDesc + " to: " + versions
656          + " in table: " + tableName);
657      } catch (Exception e) {
658        LOG.warn("Caught exception in action: " + this.getClass());
659        throw e;
660      } finally {
661        admin.close();
662      }
663    }
664  }
665
666  private class AlterFamilyEncodingAction extends ColumnAction {
667
668    @Override
669    void perform() throws IOException {
670      TableDescriptor selected = selectTable(disabledTables);
671      if (selected == null) {
672        return;
673      }
674      ColumnFamilyDescriptor columnDesc = selectFamily(selected);
675      if (columnDesc == null) {
676        return;
677      }
678
679      Admin admin = connection.getAdmin();
680      try {
681        TableName tableName = selected.getTableName();
682        // possible DataBlockEncoding ids
683        DataBlockEncoding[] possibleIds = { DataBlockEncoding.NONE, DataBlockEncoding.PREFIX,
684          DataBlockEncoding.DIFF, DataBlockEncoding.FAST_DIFF, DataBlockEncoding.ROW_INDEX_V1 };
685        short id = possibleIds[ThreadLocalRandom.current().nextInt(possibleIds.length)].getId();
686        LOG.info("Altering encoding of column family: " + columnDesc + " to: " + id + " in table: "
687          + tableName);
688
689        ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(columnDesc)
690          .setDataBlockEncoding(DataBlockEncoding.getEncodingById(id)).build();
691        TableDescriptor td =
692          TableDescriptorBuilder.newBuilder(selected).modifyColumnFamily(cfd).build();
693        admin.modifyTable(td);
694
695        // assertion
696        TableDescriptor freshTableDesc = admin.getTableDescriptor(tableName);
697        ColumnFamilyDescriptor freshColumnDesc =
698          freshTableDesc.getColumnFamily(columnDesc.getName());
699        Assert.assertEquals("Encoding of column family: " + columnDesc + " was not altered",
700          freshColumnDesc.getDataBlockEncoding().getId(), id);
701        Assert.assertTrue(
702          "After alter encoding of column family, Table: " + tableName + " is not disabled",
703          admin.isTableDisabled(tableName));
704        disabledTables.put(tableName, freshTableDesc);
705        LOG.info("Altered encoding of column family: " + freshColumnDesc + " to: " + id
706          + " in table: " + tableName);
707      } catch (Exception e) {
708        LOG.warn("Caught exception in action: " + this.getClass());
709        throw e;
710      } finally {
711        admin.close();
712      }
713    }
714  }
715
716  private class DeleteColumnFamilyAction extends ColumnAction {
717
718    @Override
719    void perform() throws IOException {
720      TableDescriptor selected = selectTable(disabledTables);
721      ColumnFamilyDescriptor cfd = selectFamily(selected);
722      if (selected == null || cfd == null) {
723        return;
724      }
725
726      Admin admin = connection.getAdmin();
727      try {
728        if (selected.getColumnFamilyCount() < 2) {
729          LOG.info("No enough column families to delete in table " + selected.getTableName());
730          return;
731        }
732        TableName tableName = selected.getTableName();
733        LOG.info("Deleting column family: " + cfd + " from table: " + tableName);
734        admin.deleteColumnFamily(tableName, cfd.getName());
735        // assertion
736        TableDescriptor freshTableDesc = admin.getDescriptor(tableName);
737        Assert.assertFalse("Column family: " + cfd + " was not added",
738          freshTableDesc.hasColumnFamily(cfd.getName()));
739        Assert.assertTrue("After delete column family, Table: " + tableName + " is not disabled",
740          admin.isTableDisabled(tableName));
741        disabledTables.put(tableName, freshTableDesc);
742        LOG.info("Deleted column family: " + cfd + " from table: " + tableName);
743      } catch (Exception e) {
744        LOG.warn("Caught exception in action: " + this.getClass());
745        throw e;
746      } finally {
747        admin.close();
748      }
749    }
750  }
751
752  private class AddRowAction extends ColumnAction {
753    // populate tables
754    @Override
755    void perform() throws IOException {
756      TableDescriptor selected = selectTable(enabledTables);
757      if (selected == null) {
758        return;
759      }
760
761      Admin admin = connection.getAdmin();
762      TableName tableName = selected.getTableName();
763      try (Table table = connection.getTable(tableName)) {
764        ArrayList<HRegionInfo> regionInfos =
765          new ArrayList<>(admin.getTableRegions(selected.getTableName()));
766        int numRegions = regionInfos.size();
767        // average number of rows to be added per action to each region
768        int average_rows = 1;
769        int numRows = average_rows * numRegions;
770        LOG.info("Adding " + numRows + " rows to table: " + selected);
771        byte[] value = new byte[10];
772        for (int i = 0; i < numRows; i++) {
773          // nextInt(Integer.MAX_VALUE)) to return positive numbers only
774          byte[] rowKey =
775            Bytes.toBytes("row-" + String.format("%010d", ThreadLocalRandom.current().nextInt()));
776          ColumnFamilyDescriptor cfd = selectFamily(selected);
777          if (cfd == null) {
778            return;
779          }
780          byte[] family = cfd.getName();
781          byte[] qualifier = Bytes.toBytes("col-" + ThreadLocalRandom.current().nextInt(10));
782          Bytes.random(value);
783          Put put = new Put(rowKey);
784          put.addColumn(family, qualifier, value);
785          table.put(put);
786        }
787        TableDescriptor freshTableDesc = admin.getDescriptor(tableName);
788        Assert.assertTrue("After insert, Table: " + tableName + " in not enabled",
789          admin.isTableEnabled(tableName));
790        enabledTables.put(tableName, freshTableDesc);
791        LOG.info("Added " + numRows + " rows to table: " + selected);
792      } catch (Exception e) {
793        LOG.warn("Caught exception in action: " + this.getClass());
794        throw e;
795      } finally {
796        admin.close();
797      }
798    }
799  }
800
801  private enum ACTION {
802    CREATE_NAMESPACE,
803    MODIFY_NAMESPACE,
804    DELETE_NAMESPACE,
805    CREATE_TABLE,
806    DISABLE_TABLE,
807    ENABLE_TABLE,
808    DELETE_TABLE,
809    ADD_COLUMNFAMILY,
810    DELETE_COLUMNFAMILY,
811    ALTER_FAMILYVERSIONS,
812    ALTER_FAMILYENCODING,
813    ADD_ROW
814  }
815
816  private class Worker extends Thread {
817
818    private Exception savedException;
819
820    private ACTION action;
821
822    @Override
823    public void run() {
824      while (running.get()) {
825        // select random action
826        ACTION selectedAction =
827          ACTION.values()[ThreadLocalRandom.current().nextInt(ACTION.values().length)];
828        this.action = selectedAction;
829        LOG.info("Performing Action: " + selectedAction);
830
831        try {
832          switch (selectedAction) {
833            case CREATE_NAMESPACE:
834              new CreateNamespaceAction().perform();
835              break;
836            case MODIFY_NAMESPACE:
837              new ModifyNamespaceAction().perform();
838              break;
839            case DELETE_NAMESPACE:
840              new DeleteNamespaceAction().perform();
841              break;
842            case CREATE_TABLE:
843              // stop creating new tables in the later stage of the test to avoid too many empty
844              // tables
845              if (create_table.get()) {
846                new CreateTableAction().perform();
847              }
848              break;
849            case ADD_ROW:
850              new AddRowAction().perform();
851              break;
852            case DISABLE_TABLE:
853              new DisableTableAction().perform();
854              break;
855            case ENABLE_TABLE:
856              new EnableTableAction().perform();
857              break;
858            case DELETE_TABLE:
859              // reduce probability of deleting table to 20%
860              if (ThreadLocalRandom.current().nextInt(100) < 20) {
861                new DeleteTableAction().perform();
862              }
863              break;
864            case ADD_COLUMNFAMILY:
865              new AddColumnFamilyAction().perform();
866              break;
867            case DELETE_COLUMNFAMILY:
868              // reduce probability of deleting column family to 20%
869              if (ThreadLocalRandom.current().nextInt(100) < 20) {
870                new DeleteColumnFamilyAction().perform();
871              }
872              break;
873            case ALTER_FAMILYVERSIONS:
874              new AlterFamilyVersionsAction().perform();
875              break;
876            case ALTER_FAMILYENCODING:
877              new AlterFamilyEncodingAction().perform();
878              break;
879          }
880        } catch (Exception ex) {
881          this.savedException = ex;
882          return;
883        }
884      }
885      LOG.info(this.getName() + " stopped");
886    }
887
888    public Exception getSavedException() {
889      return this.savedException;
890    }
891
892    public ACTION getAction() {
893      return this.action;
894    }
895  }
896
897  private void checkException(List<Worker> workers) {
898    if (workers == null || workers.isEmpty()) return;
899    for (Worker worker : workers) {
900      Exception e = worker.getSavedException();
901      if (e != null) {
902        LOG.error("Found exception in thread: " + worker.getName());
903        e.printStackTrace();
904      }
905      Assert.assertNull("Action failed: " + worker.getAction() + " in thread: " + worker.getName(),
906        e);
907    }
908  }
909
910  private int runTest() throws Exception {
911    LOG.info("Starting the test");
912
913    String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
914    long runtime = util.getConfiguration().getLong(runtimeKey, DEFAULT_RUN_TIME);
915
916    String numThreadKey = String.format(NUM_THREADS_KEY, this.getClass().getSimpleName());
917    numThreads = util.getConfiguration().getInt(numThreadKey, DEFAULT_NUM_THREADS);
918
919    ArrayList<Worker> workers = new ArrayList<>(numThreads);
920    for (int i = 0; i < numThreads; i++) {
921      checkException(workers);
922      Worker worker = new Worker();
923      LOG.info("Launching worker thread " + worker.getName());
924      workers.add(worker);
925      worker.start();
926    }
927
928    Threads.sleep(runtime / 2);
929    LOG.info("Stopping creating new tables");
930    create_table.set(false);
931    Threads.sleep(runtime / 2);
932    LOG.info("Runtime is up");
933    running.set(false);
934
935    checkException(workers);
936
937    for (Worker worker : workers) {
938      worker.join();
939    }
940    LOG.info("All Worker threads stopped");
941
942    // verify
943    LOG.info("Verify actions of all threads succeeded");
944    checkException(workers);
945    LOG.info("Verify namespaces");
946    verifyNamespaces();
947    LOG.info("Verify states of all tables");
948    verifyTables();
949
950    // RUN HBCK
951
952    HBaseFsck hbck = null;
953    try {
954      LOG.info("Running hbck");
955      hbck = HbckTestingUtil.doFsck(util.getConfiguration(), false);
956      if (HbckTestingUtil.inconsistencyFound(hbck)) {
957        // Find the inconsistency during HBCK. Leave table and namespace undropped so that
958        // we can check outside the test.
959        keepObjectsAtTheEnd = true;
960      }
961      HbckTestingUtil.assertNoErrors(hbck);
962      LOG.info("Finished hbck");
963    } finally {
964      if (hbck != null) {
965        hbck.close();
966      }
967    }
968    return 0;
969  }
970
971  @Override
972  public TableName getTablename() {
973    return null; // This test is not inteded to run with stock Chaos Monkey
974  }
975
976  @Override
977  protected Set<String> getColumnFamilies() {
978    return null; // This test is not inteded to run with stock Chaos Monkey
979  }
980
981  public static void main(String[] args) throws Exception {
982    Configuration conf = HBaseConfiguration.create();
983    IntegrationTestingUtility.setUseDistributedCluster(conf);
984    IntegrationTestDDLMasterFailover masterFailover = new IntegrationTestDDLMasterFailover();
985    Connection connection = null;
986    int ret = 1;
987    try {
988      // Initialize connection once, then pass to Actions
989      LOG.debug("Setting up connection ...");
990      connection = ConnectionFactory.createConnection(conf);
991      masterFailover.setConnection(connection);
992      ret = ToolRunner.run(conf, masterFailover, args);
993    } catch (IOException e) {
994      LOG.error(HBaseMarkers.FATAL, "Failed to establish connection. Aborting test ...", e);
995    } finally {
996      connection = masterFailover.getConnection();
997      if (connection != null) {
998        connection.close();
999      }
1000      System.exit(ret);
1001    }
1002  }
1003}