001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.Set;
024import java.util.concurrent.ConcurrentHashMap;
025import java.util.concurrent.ThreadLocalRandom;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.regex.Pattern;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.client.Admin;
030import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
031import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
032import org.apache.hadoop.hbase.client.Connection;
033import org.apache.hadoop.hbase.client.ConnectionFactory;
034import org.apache.hadoop.hbase.client.Put;
035import org.apache.hadoop.hbase.client.RegionInfo;
036import org.apache.hadoop.hbase.client.Table;
037import org.apache.hadoop.hbase.client.TableDescriptor;
038import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
039import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
040import org.apache.hadoop.hbase.log.HBaseMarkers;
041import org.apache.hadoop.hbase.testclassification.IntegrationTests;
042import org.apache.hadoop.hbase.util.Bytes;
043import org.apache.hadoop.hbase.util.HBaseFsck;
044import org.apache.hadoop.hbase.util.Threads;
045import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil;
046import org.apache.hadoop.util.ToolRunner;
047import org.junit.Assert;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053/**
054 * Integration test that verifies Procedure V2. DDL operations should go through (rollforward or
055 * rollback) when primary master is killed by ChaosMonkey (default MASTER_KILLING).
056 * <p>
057 * </p>
058 * Multiple Worker threads are started to randomly do the following Actions in loops: Actions
059 * generating and populating tables:
060 * <ul>
061 * <li>CreateTableAction</li>
062 * <li>DisableTableAction</li>
063 * <li>EnableTableAction</li>
064 * <li>DeleteTableAction</li>
065 * <li>AddRowAction</li>
066 * </ul>
067 * Actions performing column family DDL operations:
068 * <ul>
069 * <li>AddColumnFamilyAction</li>
070 * <li>AlterColumnFamilyVersionsAction</li>
071 * <li>AlterColumnFamilyEncodingAction</li>
072 * <li>DeleteColumnFamilyAction</li>
073 * </ul>
074 * Actions performing namespace DDL operations:
075 * <ul>
076 * <li>AddNamespaceAction</li>
077 * <li>AlterNamespaceAction</li>
078 * <li>DeleteNamespaceAction</li>
079 * </ul>
080 * <br/>
081 * The threads run for a period of time (default 20 minutes) then are stopped at the end of runtime.
082 * Verification is performed towards those checkpoints:
083 * <ol>
084 * <li>No Actions throw Exceptions.</li>
085 * <li>No inconsistencies are detected in hbck.</li>
086 * </ol>
087 * <p>
088 * This test should be run by the hbase user since it invokes hbck at the end
089 * </p>
090 * <p>
091 * Usage: hbase org.apache.hadoop.hbase.IntegrationTestDDLMasterFailover
092 * -Dhbase.IntegrationTestDDLMasterFailover.runtime=1200000
093 * -Dhbase.IntegrationTestDDLMasterFailover.numThreads=20
094 * -Dhbase.IntegrationTestDDLMasterFailover.numRegions=50 --monkey masterKilling
095 */
096
097@Category(IntegrationTests.class)
098public class IntegrationTestDDLMasterFailover extends IntegrationTestBase {
099
100  private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestDDLMasterFailover.class);
101
102  private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster
103
104  protected static final long DEFAULT_RUN_TIME = 20 * 60 * 1000;
105
106  protected static final int DEFAULT_NUM_THREADS = 20;
107
108  protected static final int DEFAULT_NUM_REGIONS = 50; // number of regions in pre-split tables
109
110  private boolean keepObjectsAtTheEnd = false;
111  protected HBaseClusterInterface cluster;
112
113  protected Connection connection;
114
115  /**
116   * A soft limit on how long we should run
117   */
118  protected static final String RUN_TIME_KEY = "hbase.%s.runtime";
119  protected static final String NUM_THREADS_KEY = "hbase.%s.numThreads";
120  protected static final String NUM_REGIONS_KEY = "hbase.%s.numRegions";
121
122  protected AtomicBoolean running = new AtomicBoolean(true);
123
124  protected AtomicBoolean create_table = new AtomicBoolean(true);
125
126  protected int numThreads, numRegions;
127
128  ConcurrentHashMap<String, NamespaceDescriptor> namespaceMap = new ConcurrentHashMap<>();
129
130  ConcurrentHashMap<TableName, TableDescriptor> enabledTables = new ConcurrentHashMap<>();
131
132  ConcurrentHashMap<TableName, TableDescriptor> disabledTables = new ConcurrentHashMap<>();
133
134  ConcurrentHashMap<TableName, TableDescriptor> deletedTables = new ConcurrentHashMap<>();
135
136  @Override
137  public void setUpCluster() throws Exception {
138    util = getTestingUtil(getConf());
139    LOG.debug("Initializing/checking cluster has " + SERVER_COUNT + " servers");
140    util.initializeCluster(getMinServerCount());
141    LOG.debug("Done initializing/checking cluster");
142    cluster = util.getHBaseClusterInterface();
143  }
144
145  @Override
146  public void cleanUpCluster() throws Exception {
147    if (!keepObjectsAtTheEnd) {
148      Admin admin = util.getAdmin();
149      for (TableName tableName : admin.listTableNames(Pattern.compile("ittable-\\d+"))) {
150        admin.disableTable(tableName);
151        admin.deleteTable(tableName);
152      }
153      NamespaceDescriptor[] nsds = admin.listNamespaceDescriptors();
154      for (NamespaceDescriptor nsd : nsds) {
155        if (nsd.getName().matches("itnamespace\\d+")) {
156          LOG.info("Removing namespace=" + nsd.getName());
157          admin.deleteNamespace(nsd.getName());
158        }
159      }
160    }
161
162    enabledTables.clear();
163    disabledTables.clear();
164    deletedTables.clear();
165    namespaceMap.clear();
166
167    Connection connection = getConnection();
168    connection.close();
169    super.cleanUpCluster();
170  }
171
172  protected int getMinServerCount() {
173    return SERVER_COUNT;
174  }
175
176  protected synchronized void setConnection(Connection connection) {
177    this.connection = connection;
178  }
179
180  protected synchronized Connection getConnection() {
181    if (this.connection == null) {
182      try {
183        Connection connection = ConnectionFactory.createConnection(getConf());
184        setConnection(connection);
185      } catch (IOException e) {
186        LOG.error(HBaseMarkers.FATAL, "Failed to establish connection.", e);
187      }
188    }
189    return connection;
190  }
191
192  protected void verifyNamespaces() throws IOException {
193    Connection connection = getConnection();
194    Admin admin = connection.getAdmin();
195    // iterating concurrent map
196    for (String nsName : namespaceMap.keySet()) {
197      try {
198        Assert.assertTrue("Namespace: " + nsName + " in namespaceMap does not exist",
199          admin.getNamespaceDescriptor(nsName) != null);
200      } catch (NamespaceNotFoundException nsnfe) {
201        Assert
202          .fail("Namespace: " + nsName + " in namespaceMap does not exist: " + nsnfe.getMessage());
203      }
204    }
205    admin.close();
206  }
207
208  protected void verifyTables() throws IOException {
209    Connection connection = getConnection();
210    Admin admin = connection.getAdmin();
211    // iterating concurrent map
212    for (TableName tableName : enabledTables.keySet()) {
213      Assert.assertTrue("Table: " + tableName + " in enabledTables is not enabled",
214        admin.isTableEnabled(tableName));
215    }
216    for (TableName tableName : disabledTables.keySet()) {
217      Assert.assertTrue("Table: " + tableName + " in disabledTables is not disabled",
218        admin.isTableDisabled(tableName));
219    }
220    for (TableName tableName : deletedTables.keySet()) {
221      Assert.assertFalse("Table: " + tableName + " in deletedTables is not deleted",
222        admin.tableExists(tableName));
223    }
224    admin.close();
225  }
226
227  @Test
228  public void testAsUnitTest() throws Exception {
229    runTest();
230  }
231
232  @Override
233  public int runTestFromCommandLine() throws Exception {
234    int ret = runTest();
235    return ret;
236  }
237
238  private abstract class MasterAction {
239    Connection connection = getConnection();
240
241    abstract void perform() throws IOException;
242  }
243
244  private abstract class NamespaceAction extends MasterAction {
245    final String nsTestConfigKey = "hbase.namespace.testKey";
246
247    // NamespaceAction has implemented selectNamespace() shared by multiple namespace Actions
248    protected NamespaceDescriptor
249      selectNamespace(ConcurrentHashMap<String, NamespaceDescriptor> namespaceMap) {
250      // synchronization to prevent removal from multiple threads
251      synchronized (namespaceMap) {
252        // randomly select namespace from namespaceMap
253        if (namespaceMap.isEmpty()) {
254          return null;
255        }
256        ArrayList<String> namespaceList = new ArrayList<>(namespaceMap.keySet());
257        String randomKey =
258          namespaceList.get(ThreadLocalRandom.current().nextInt(namespaceList.size()));
259        NamespaceDescriptor randomNsd = namespaceMap.get(randomKey);
260        // remove from namespaceMap
261        namespaceMap.remove(randomKey);
262        return randomNsd;
263      }
264    }
265  }
266
267  private class CreateNamespaceAction extends NamespaceAction {
268    @Override
269    void perform() throws IOException {
270      Admin admin = connection.getAdmin();
271      try {
272        NamespaceDescriptor nsd;
273        while (true) {
274          nsd = createNamespaceDesc();
275          try {
276            if (admin.getNamespaceDescriptor(nsd.getName()) != null) {
277              // the namespace has already existed.
278              continue;
279            } else {
280              // currently, the code never return null - always throws exception if
281              // namespace is not found - this just a defensive programming to make
282              // sure null situation is handled in case the method changes in the
283              // future.
284              break;
285            }
286          } catch (NamespaceNotFoundException nsnfe) {
287            // This is expected for a random generated NamespaceDescriptor
288            break;
289          }
290        }
291        LOG.info("Creating namespace:" + nsd);
292        admin.createNamespace(nsd);
293        NamespaceDescriptor freshNamespaceDesc = admin.getNamespaceDescriptor(nsd.getName());
294        Assert.assertTrue("Namespace: " + nsd + " was not created", freshNamespaceDesc != null);
295        namespaceMap.put(nsd.getName(), freshNamespaceDesc);
296        LOG.info("Created namespace:" + freshNamespaceDesc);
297      } catch (Exception e) {
298        LOG.warn("Caught exception in action: " + this.getClass());
299        throw e;
300      } finally {
301        admin.close();
302      }
303    }
304
305    private NamespaceDescriptor createNamespaceDesc() {
306      String namespaceName =
307        "itnamespace" + String.format("%010d", ThreadLocalRandom.current().nextInt());
308      NamespaceDescriptor nsd = NamespaceDescriptor.create(namespaceName).build();
309
310      nsd.setConfiguration(nsTestConfigKey,
311        String.format("%010d", ThreadLocalRandom.current().nextInt()));
312      return nsd;
313    }
314  }
315
316  private class ModifyNamespaceAction extends NamespaceAction {
317    @Override
318    void perform() throws IOException {
319      NamespaceDescriptor selected = selectNamespace(namespaceMap);
320      if (selected == null) {
321        return;
322      }
323
324      Admin admin = connection.getAdmin();
325      try {
326        String namespaceName = selected.getName();
327        LOG.info("Modifying namespace :" + selected);
328        NamespaceDescriptor modifiedNsd = NamespaceDescriptor.create(namespaceName).build();
329        String nsValueNew;
330        do {
331          nsValueNew = String.format("%010d", ThreadLocalRandom.current().nextInt());
332        } while (selected.getConfigurationValue(nsTestConfigKey).equals(nsValueNew));
333        modifiedNsd.setConfiguration(nsTestConfigKey, nsValueNew);
334        admin.modifyNamespace(modifiedNsd);
335        NamespaceDescriptor freshNamespaceDesc = admin.getNamespaceDescriptor(namespaceName);
336        Assert.assertTrue("Namespace: " + selected + " was not modified",
337          freshNamespaceDesc.getConfigurationValue(nsTestConfigKey).equals(nsValueNew));
338        Assert.assertTrue("Namespace: " + namespaceName + " does not exist",
339          admin.getNamespaceDescriptor(namespaceName) != null);
340        namespaceMap.put(namespaceName, freshNamespaceDesc);
341        LOG.info("Modified namespace :" + freshNamespaceDesc);
342      } catch (Exception e) {
343        LOG.warn("Caught exception in action: " + this.getClass());
344        throw e;
345      } finally {
346        admin.close();
347      }
348    }
349  }
350
351  private class DeleteNamespaceAction extends NamespaceAction {
352    @Override
353    void perform() throws IOException {
354      NamespaceDescriptor selected = selectNamespace(namespaceMap);
355      if (selected == null) {
356        return;
357      }
358
359      Admin admin = connection.getAdmin();
360      try {
361        String namespaceName = selected.getName();
362        LOG.info("Deleting namespace :" + selected);
363        admin.deleteNamespace(namespaceName);
364        try {
365          if (admin.getNamespaceDescriptor(namespaceName) != null) {
366            // the namespace still exists.
367            Assert.assertTrue("Namespace: " + selected + " was not deleted", false);
368          } else {
369            LOG.info("Deleted namespace :" + selected);
370          }
371        } catch (NamespaceNotFoundException nsnfe) {
372          // This is expected result
373          LOG.info("Deleted namespace :" + selected);
374        }
375      } catch (Exception e) {
376        LOG.warn("Caught exception in action: " + this.getClass());
377        throw e;
378      } finally {
379        admin.close();
380      }
381    }
382  }
383
384  private abstract class TableAction extends MasterAction {
385    // TableAction has implemented selectTable() shared by multiple table Actions
386    protected TableDescriptor selectTable(ConcurrentHashMap<TableName, TableDescriptor> tableMap) {
387      // synchronization to prevent removal from multiple threads
388      synchronized (tableMap) {
389        // randomly select table from tableMap
390        if (tableMap.isEmpty()) {
391          return null;
392        }
393        ArrayList<TableName> tableList = new ArrayList<>(tableMap.keySet());
394        TableName key = tableList.get(ThreadLocalRandom.current().nextInt(tableList.size()));
395        TableDescriptor randomTd = tableMap.remove(key);
396        return randomTd;
397      }
398    }
399  }
400
401  private class CreateTableAction extends TableAction {
402
403    @Override
404    void perform() throws IOException {
405      Admin admin = connection.getAdmin();
406      try {
407        TableDescriptor td = createTableDesc();
408        TableName tableName = td.getTableName();
409        if (admin.tableExists(tableName)) {
410          return;
411        }
412        String numRegionKey = String.format(NUM_REGIONS_KEY, this.getClass().getSimpleName());
413        numRegions = getConf().getInt(numRegionKey, DEFAULT_NUM_REGIONS);
414        byte[] startKey = Bytes.toBytes("row-0000000000");
415        byte[] endKey = Bytes.toBytes("row-" + Integer.MAX_VALUE);
416        LOG.info("Creating table:" + td);
417        admin.createTable(td, startKey, endKey, numRegions);
418        Assert.assertTrue("Table: " + td + " was not created", admin.tableExists(tableName));
419        TableDescriptor freshTableDesc = admin.getDescriptor(tableName);
420        Assert.assertTrue("After create, Table: " + tableName + " in not enabled",
421          admin.isTableEnabled(tableName));
422        enabledTables.put(tableName, freshTableDesc);
423        LOG.info("Created table:" + freshTableDesc);
424      } catch (Exception e) {
425        LOG.warn("Caught exception in action: " + this.getClass());
426        throw e;
427      } finally {
428        admin.close();
429      }
430    }
431
432    private TableDescriptor createTableDesc() {
433      String tableName =
434        String.format("ittable-%010d", ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE));
435      String familyName = "cf-" + ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE);
436      return TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))
437        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(familyName)).build();
438    }
439  }
440
441  private class DisableTableAction extends TableAction {
442
443    @Override
444    void perform() throws IOException {
445
446      TableDescriptor selected = selectTable(enabledTables);
447      if (selected == null) {
448        return;
449      }
450
451      Admin admin = connection.getAdmin();
452      try {
453        TableName tableName = selected.getTableName();
454        LOG.info("Disabling table :" + selected);
455        admin.disableTable(tableName);
456        Assert.assertTrue("Table: " + selected + " was not disabled",
457          admin.isTableDisabled(tableName));
458        TableDescriptor freshTableDesc = admin.getDescriptor(tableName);
459        Assert.assertTrue("After disable, Table: " + tableName + " is not disabled",
460          admin.isTableDisabled(tableName));
461        disabledTables.put(tableName, freshTableDesc);
462        LOG.info("Disabled table :" + freshTableDesc);
463      } catch (Exception e) {
464        LOG.warn("Caught exception in action: " + this.getClass());
465        // TODO workaround
466        // loose restriction for TableNotDisabledException/TableNotEnabledException thrown in sync
467        // operations
468        // 1) when enable/disable starts, the table state is changed to ENABLING/DISABLING (ZK node
469        // in 1.x), which will be further changed to ENABLED/DISABLED once the operation completes
470        // 2) if master failover happens in the middle of the enable/disable operation, the new
471        // master will try to recover the tables in ENABLING/DISABLING state, as programmed in
472        // AssignmentManager#recoverTableInEnablingState() and
473        // AssignmentManager#recoverTableInDisablingState()
474        // 3) after the new master initialization completes, the procedure tries to re-do the
475        // enable/disable operation, which was already done. Ignore those exceptions before change
476        // of behaviors of AssignmentManager in presence of PV2
477        if (e instanceof TableNotEnabledException) {
478          LOG.warn("Caught TableNotEnabledException in action: " + this.getClass());
479          e.printStackTrace();
480        } else {
481          throw e;
482        }
483      } finally {
484        admin.close();
485      }
486    }
487  }
488
489  private class EnableTableAction extends TableAction {
490
491    @Override
492    void perform() throws IOException {
493
494      TableDescriptor selected = selectTable(disabledTables);
495      if (selected == null) {
496        return;
497      }
498
499      Admin admin = connection.getAdmin();
500      try {
501        TableName tableName = selected.getTableName();
502        LOG.info("Enabling table :" + selected);
503        admin.enableTable(tableName);
504        Assert.assertTrue("Table: " + selected + " was not enabled",
505          admin.isTableEnabled(tableName));
506        TableDescriptor freshTableDesc = admin.getDescriptor(tableName);
507        Assert.assertTrue("After enable, Table: " + tableName + " in not enabled",
508          admin.isTableEnabled(tableName));
509        enabledTables.put(tableName, freshTableDesc);
510        LOG.info("Enabled table :" + freshTableDesc);
511      } catch (Exception e) {
512        LOG.warn("Caught exception in action: " + this.getClass());
513        // TODO workaround
514        // loose restriction for TableNotDisabledException/TableNotEnabledException thrown in sync
515        // operations 1) when enable/disable starts, the table state is changed to
516        // ENABLING/DISABLING (ZK node in 1.x), which will be further changed to ENABLED/DISABLED
517        // once the operation completes 2) if master failover happens in the middle of the
518        // enable/disable operation, the new master will try to recover the tables in
519        // ENABLING/DISABLING state, as programmed in
520        // AssignmentManager#recoverTableInEnablingState() and
521        // AssignmentManager#recoverTableInDisablingState()
522        // 3) after the new master initialization completes, the procedure tries to re-do the
523        // enable/disable operation, which was already done. Ignore those exceptions before
524        // change of behaviors of AssignmentManager in presence of PV2
525        if (e instanceof TableNotDisabledException) {
526          LOG.warn("Caught TableNotDisabledException in action: " + this.getClass());
527          e.printStackTrace();
528        } else {
529          throw e;
530        }
531      } finally {
532        admin.close();
533      }
534    }
535  }
536
537  private class DeleteTableAction extends TableAction {
538
539    @Override
540    void perform() throws IOException {
541
542      TableDescriptor selected = selectTable(disabledTables);
543      if (selected == null) {
544        return;
545      }
546
547      Admin admin = connection.getAdmin();
548      try {
549        TableName tableName = selected.getTableName();
550        LOG.info("Deleting table :" + selected);
551        admin.deleteTable(tableName);
552        Assert.assertFalse("Table: " + selected + " was not deleted", admin.tableExists(tableName));
553        deletedTables.put(tableName, selected);
554        LOG.info("Deleted table :" + selected);
555      } catch (Exception e) {
556        LOG.warn("Caught exception in action: " + this.getClass());
557        throw e;
558      } finally {
559        admin.close();
560      }
561    }
562  }
563
564  private abstract class ColumnAction extends TableAction {
565    // ColumnAction has implemented selectFamily() shared by multiple family Actions
566    protected ColumnFamilyDescriptor selectFamily(TableDescriptor td) {
567      if (td == null) {
568        return null;
569      }
570      ColumnFamilyDescriptor[] families = td.getColumnFamilies();
571      if (families.length == 0) {
572        LOG.info("No column families in table: " + td);
573        return null;
574      }
575      return families[ThreadLocalRandom.current().nextInt(families.length)];
576    }
577  }
578
579  private class AddColumnFamilyAction extends ColumnAction {
580
581    @Override
582    void perform() throws IOException {
583      TableDescriptor selected = selectTable(disabledTables);
584      if (selected == null) {
585        return;
586      }
587
588      Admin admin = connection.getAdmin();
589      try {
590        ColumnFamilyDescriptor cfd = createFamilyDesc();
591        if (selected.hasColumnFamily(cfd.getName())) {
592          LOG.info(
593            Bytes.toString(cfd.getName()) + " already exists in table " + selected.getTableName());
594          return;
595        }
596        TableName tableName = selected.getTableName();
597        LOG.info("Adding column family: " + cfd + " to table: " + tableName);
598        admin.addColumnFamily(tableName, cfd);
599        // assertion
600        TableDescriptor freshTableDesc = admin.getDescriptor(tableName);
601        Assert.assertTrue("Column family: " + cfd + " was not added",
602          freshTableDesc.hasColumnFamily(cfd.getName()));
603        Assert.assertTrue("After add column family, Table: " + tableName + " is not disabled",
604          admin.isTableDisabled(tableName));
605        disabledTables.put(tableName, freshTableDesc);
606        LOG.info("Added column family: " + cfd + " to table: " + tableName);
607      } catch (Exception e) {
608        LOG.warn("Caught exception in action: " + this.getClass());
609        throw e;
610      } finally {
611        admin.close();
612      }
613    }
614
615    private ColumnFamilyDescriptor createFamilyDesc() {
616      String familyName = String.format("cf-%010d", ThreadLocalRandom.current().nextInt());
617      return ColumnFamilyDescriptorBuilder.of(familyName);
618    }
619  }
620
621  private class AlterFamilyVersionsAction extends ColumnAction {
622
623    @Override
624    void perform() throws IOException {
625      TableDescriptor selected = selectTable(disabledTables);
626      if (selected == null) {
627        return;
628      }
629      ColumnFamilyDescriptor columnDesc = selectFamily(selected);
630      if (columnDesc == null) {
631        return;
632      }
633
634      Admin admin = connection.getAdmin();
635      int versions = ThreadLocalRandom.current().nextInt(10) + 3;
636      try {
637        TableName tableName = selected.getTableName();
638        LOG.info("Altering versions of column family: " + columnDesc + " to: " + versions
639          + " in table: " + tableName);
640
641        ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(columnDesc)
642          .setMinVersions(versions).setMaxVersions(versions).build();
643        TableDescriptor td =
644          TableDescriptorBuilder.newBuilder(selected).modifyColumnFamily(cfd).build();
645        admin.modifyTable(td);
646
647        // assertion
648        TableDescriptor freshTableDesc = admin.getDescriptor(tableName);
649        ColumnFamilyDescriptor freshColumnDesc =
650          freshTableDesc.getColumnFamily(columnDesc.getName());
651        Assert.assertEquals("Column family: " + columnDesc + " was not altered",
652          freshColumnDesc.getMaxVersions(), versions);
653        Assert.assertEquals("Column family: " + freshColumnDesc + " was not altered",
654          freshColumnDesc.getMinVersions(), versions);
655        Assert.assertTrue(
656          "After alter versions of column family, Table: " + tableName + " is not disabled",
657          admin.isTableDisabled(tableName));
658        disabledTables.put(tableName, freshTableDesc);
659        LOG.info("Altered versions of column family: " + columnDesc + " to: " + versions
660          + " in table: " + tableName);
661      } catch (Exception e) {
662        LOG.warn("Caught exception in action: " + this.getClass());
663        throw e;
664      } finally {
665        admin.close();
666      }
667    }
668  }
669
670  private class AlterFamilyEncodingAction extends ColumnAction {
671
672    @Override
673    void perform() throws IOException {
674      TableDescriptor selected = selectTable(disabledTables);
675      if (selected == null) {
676        return;
677      }
678      ColumnFamilyDescriptor columnDesc = selectFamily(selected);
679      if (columnDesc == null) {
680        return;
681      }
682
683      Admin admin = connection.getAdmin();
684      try {
685        TableName tableName = selected.getTableName();
686        // possible DataBlockEncoding ids
687        DataBlockEncoding[] possibleIds = { DataBlockEncoding.NONE, DataBlockEncoding.PREFIX,
688          DataBlockEncoding.DIFF, DataBlockEncoding.FAST_DIFF, DataBlockEncoding.ROW_INDEX_V1 };
689        short id = possibleIds[ThreadLocalRandom.current().nextInt(possibleIds.length)].getId();
690        LOG.info("Altering encoding of column family: " + columnDesc + " to: " + id + " in table: "
691          + tableName);
692
693        ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(columnDesc)
694          .setDataBlockEncoding(DataBlockEncoding.getEncodingById(id)).build();
695        TableDescriptor td =
696          TableDescriptorBuilder.newBuilder(selected).modifyColumnFamily(cfd).build();
697        admin.modifyTable(td);
698
699        // assertion
700        TableDescriptor freshTableDesc = admin.getDescriptor(tableName);
701        ColumnFamilyDescriptor freshColumnDesc =
702          freshTableDesc.getColumnFamily(columnDesc.getName());
703        Assert.assertEquals("Encoding of column family: " + columnDesc + " was not altered",
704          freshColumnDesc.getDataBlockEncoding().getId(), id);
705        Assert.assertTrue(
706          "After alter encoding of column family, Table: " + tableName + " is not disabled",
707          admin.isTableDisabled(tableName));
708        disabledTables.put(tableName, freshTableDesc);
709        LOG.info("Altered encoding of column family: " + freshColumnDesc + " to: " + id
710          + " in table: " + tableName);
711      } catch (Exception e) {
712        LOG.warn("Caught exception in action: " + this.getClass());
713        throw e;
714      } finally {
715        admin.close();
716      }
717    }
718  }
719
720  private class DeleteColumnFamilyAction extends ColumnAction {
721
722    @Override
723    void perform() throws IOException {
724      TableDescriptor selected = selectTable(disabledTables);
725      ColumnFamilyDescriptor cfd = selectFamily(selected);
726      if (selected == null || cfd == null) {
727        return;
728      }
729
730      Admin admin = connection.getAdmin();
731      try {
732        if (selected.getColumnFamilyCount() < 2) {
733          LOG.info("No enough column families to delete in table " + selected.getTableName());
734          return;
735        }
736        TableName tableName = selected.getTableName();
737        LOG.info("Deleting column family: " + cfd + " from table: " + tableName);
738        admin.deleteColumnFamily(tableName, cfd.getName());
739        // assertion
740        TableDescriptor freshTableDesc = admin.getDescriptor(tableName);
741        Assert.assertFalse("Column family: " + cfd + " was not added",
742          freshTableDesc.hasColumnFamily(cfd.getName()));
743        Assert.assertTrue("After delete column family, Table: " + tableName + " is not disabled",
744          admin.isTableDisabled(tableName));
745        disabledTables.put(tableName, freshTableDesc);
746        LOG.info("Deleted column family: " + cfd + " from table: " + tableName);
747      } catch (Exception e) {
748        LOG.warn("Caught exception in action: " + this.getClass());
749        throw e;
750      } finally {
751        admin.close();
752      }
753    }
754  }
755
756  private class AddRowAction extends ColumnAction {
757    // populate tables
758    @Override
759    void perform() throws IOException {
760      TableDescriptor selected = selectTable(enabledTables);
761      if (selected == null) {
762        return;
763      }
764
765      Admin admin = connection.getAdmin();
766      TableName tableName = selected.getTableName();
767      try (Table table = connection.getTable(tableName)) {
768        ArrayList<RegionInfo> regionInfos =
769          new ArrayList<>(admin.getRegions(selected.getTableName()));
770        int numRegions = regionInfos.size();
771        // average number of rows to be added per action to each region
772        int average_rows = 1;
773        int numRows = average_rows * numRegions;
774        LOG.info("Adding " + numRows + " rows to table: " + selected);
775        byte[] value = new byte[10];
776        for (int i = 0; i < numRows; i++) {
777          // nextInt(Integer.MAX_VALUE)) to return positive numbers only
778          byte[] rowKey =
779            Bytes.toBytes("row-" + String.format("%010d", ThreadLocalRandom.current().nextInt()));
780          ColumnFamilyDescriptor cfd = selectFamily(selected);
781          if (cfd == null) {
782            return;
783          }
784          byte[] family = cfd.getName();
785          byte[] qualifier = Bytes.toBytes("col-" + ThreadLocalRandom.current().nextInt(10));
786          Bytes.random(value);
787          Put put = new Put(rowKey);
788          put.addColumn(family, qualifier, value);
789          table.put(put);
790        }
791        TableDescriptor freshTableDesc = admin.getDescriptor(tableName);
792        Assert.assertTrue("After insert, Table: " + tableName + " in not enabled",
793          admin.isTableEnabled(tableName));
794        enabledTables.put(tableName, freshTableDesc);
795        LOG.info("Added " + numRows + " rows to table: " + selected);
796      } catch (Exception e) {
797        LOG.warn("Caught exception in action: " + this.getClass());
798        throw e;
799      } finally {
800        admin.close();
801      }
802    }
803  }
804
805  private enum ACTION {
806    CREATE_NAMESPACE,
807    MODIFY_NAMESPACE,
808    DELETE_NAMESPACE,
809    CREATE_TABLE,
810    DISABLE_TABLE,
811    ENABLE_TABLE,
812    DELETE_TABLE,
813    ADD_COLUMNFAMILY,
814    DELETE_COLUMNFAMILY,
815    ALTER_FAMILYVERSIONS,
816    ALTER_FAMILYENCODING,
817    ADD_ROW
818  }
819
820  private class Worker extends Thread {
821
822    private Exception savedException;
823
824    private ACTION action;
825
826    @Override
827    public void run() {
828      while (running.get()) {
829        // select random action
830        ACTION selectedAction =
831          ACTION.values()[ThreadLocalRandom.current().nextInt(ACTION.values().length)];
832        this.action = selectedAction;
833        LOG.info("Performing Action: " + selectedAction);
834
835        try {
836          switch (selectedAction) {
837            case CREATE_NAMESPACE:
838              new CreateNamespaceAction().perform();
839              break;
840            case MODIFY_NAMESPACE:
841              new ModifyNamespaceAction().perform();
842              break;
843            case DELETE_NAMESPACE:
844              new DeleteNamespaceAction().perform();
845              break;
846            case CREATE_TABLE:
847              // stop creating new tables in the later stage of the test to avoid too many empty
848              // tables
849              if (create_table.get()) {
850                new CreateTableAction().perform();
851              }
852              break;
853            case ADD_ROW:
854              new AddRowAction().perform();
855              break;
856            case DISABLE_TABLE:
857              new DisableTableAction().perform();
858              break;
859            case ENABLE_TABLE:
860              new EnableTableAction().perform();
861              break;
862            case DELETE_TABLE:
863              // reduce probability of deleting table to 20%
864              if (ThreadLocalRandom.current().nextInt(100) < 20) {
865                new DeleteTableAction().perform();
866              }
867              break;
868            case ADD_COLUMNFAMILY:
869              new AddColumnFamilyAction().perform();
870              break;
871            case DELETE_COLUMNFAMILY:
872              // reduce probability of deleting column family to 20%
873              if (ThreadLocalRandom.current().nextInt(100) < 20) {
874                new DeleteColumnFamilyAction().perform();
875              }
876              break;
877            case ALTER_FAMILYVERSIONS:
878              new AlterFamilyVersionsAction().perform();
879              break;
880            case ALTER_FAMILYENCODING:
881              new AlterFamilyEncodingAction().perform();
882              break;
883          }
884        } catch (Exception ex) {
885          this.savedException = ex;
886          return;
887        }
888      }
889      LOG.info(this.getName() + " stopped");
890    }
891
892    public Exception getSavedException() {
893      return this.savedException;
894    }
895
896    public ACTION getAction() {
897      return this.action;
898    }
899  }
900
901  private void checkException(List<Worker> workers) {
902    if (workers == null || workers.isEmpty()) return;
903    for (Worker worker : workers) {
904      Exception e = worker.getSavedException();
905      if (e != null) {
906        LOG.error("Found exception in thread: " + worker.getName());
907        e.printStackTrace();
908      }
909      Assert.assertNull("Action failed: " + worker.getAction() + " in thread: " + worker.getName(),
910        e);
911    }
912  }
913
914  private int runTest() throws Exception {
915    LOG.info("Starting the test");
916
917    String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
918    long runtime = util.getConfiguration().getLong(runtimeKey, DEFAULT_RUN_TIME);
919
920    String numThreadKey = String.format(NUM_THREADS_KEY, this.getClass().getSimpleName());
921    numThreads = util.getConfiguration().getInt(numThreadKey, DEFAULT_NUM_THREADS);
922
923    ArrayList<Worker> workers = new ArrayList<>(numThreads);
924    for (int i = 0; i < numThreads; i++) {
925      checkException(workers);
926      Worker worker = new Worker();
927      LOG.info("Launching worker thread " + worker.getName());
928      workers.add(worker);
929      worker.start();
930    }
931
932    Threads.sleep(runtime / 2);
933    LOG.info("Stopping creating new tables");
934    create_table.set(false);
935    Threads.sleep(runtime / 2);
936    LOG.info("Runtime is up");
937    running.set(false);
938
939    checkException(workers);
940
941    for (Worker worker : workers) {
942      worker.join();
943    }
944    LOG.info("All Worker threads stopped");
945
946    // verify
947    LOG.info("Verify actions of all threads succeeded");
948    checkException(workers);
949    LOG.info("Verify namespaces");
950    verifyNamespaces();
951    LOG.info("Verify states of all tables");
952    verifyTables();
953
954    // RUN HBCK
955
956    HBaseFsck hbck = null;
957    try {
958      LOG.info("Running hbck");
959      hbck = HbckTestingUtil.doFsck(util.getConfiguration(), false);
960      if (HbckTestingUtil.inconsistencyFound(hbck)) {
961        // Find the inconsistency during HBCK. Leave table and namespace undropped so that
962        // we can check outside the test.
963        keepObjectsAtTheEnd = true;
964      }
965      HbckTestingUtil.assertNoErrors(hbck);
966      LOG.info("Finished hbck");
967    } finally {
968      if (hbck != null) {
969        hbck.close();
970      }
971    }
972    return 0;
973  }
974
975  @Override
976  public TableName getTablename() {
977    return null; // This test is not inteded to run with stock Chaos Monkey
978  }
979
980  @Override
981  protected Set<String> getColumnFamilies() {
982    return null; // This test is not inteded to run with stock Chaos Monkey
983  }
984
985  public static void main(String[] args) throws Exception {
986    Configuration conf = HBaseConfiguration.create();
987    IntegrationTestingUtility.setUseDistributedCluster(conf);
988    IntegrationTestDDLMasterFailover masterFailover = new IntegrationTestDDLMasterFailover();
989    Connection connection = null;
990    int ret = 1;
991    try {
992      // Initialize connection once, then pass to Actions
993      LOG.debug("Setting up connection ...");
994      connection = ConnectionFactory.createConnection(conf);
995      masterFailover.setConnection(connection);
996      ret = ToolRunner.run(conf, masterFailover, args);
997    } catch (IOException e) {
998      LOG.error(HBaseMarkers.FATAL, "Failed to establish connection. Aborting test ...", e);
999    } finally {
1000      connection = masterFailover.getConnection();
1001      if (connection != null) {
1002        connection.close();
1003      }
1004      System.exit(ret);
1005    }
1006  }
1007}