001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.util.List;
021import java.util.Objects;
022import java.util.StringJoiner;
023import java.util.concurrent.CompletableFuture;
024import java.util.function.Supplier;
025import java.util.stream.Collectors;
026import org.apache.hadoop.hbase.client.AsyncAdmin;
027import org.apache.hadoop.hbase.client.AsyncConnection;
028import org.junit.ClassRule;
029import org.junit.Rule;
030import org.junit.rules.ExternalResource;
031import org.junit.rules.TestRule;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035/**
036 * A {@link TestRule} that clears all user namespaces and tables {@link ExternalResource#before()
037 * before} the test executes. Can be used in either the {@link Rule} or {@link ClassRule} positions.
038 * Lazily realizes the provided {@link AsyncConnection} so as to avoid initialization races with
039 * other {@link Rule Rules}. <b>Does not</b> {@link AsyncConnection#close() close()} provided
040 * connection instance when finished.
041 * </p>
042 * Use in combination with {@link MiniClusterRule} and {@link ConnectionRule}, for example:
043 *
044 * <pre>
045 * {
046 *   &#64;code
047 *   public class TestMyClass {
048 *     &#64;ClassRule
049 *     public static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder().build();
050 *
051 *     private final ConnectionRule connectionRule =
052 *       new ConnectionRule(miniClusterRule::createConnection);
053 *     private final ClearUserNamespacesAndTablesRule clearUserNamespacesAndTablesRule =
054 *       new ClearUserNamespacesAndTablesRule(connectionRule::getConnection);
055 *
056 *     &#64;Rule
057 *     public TestRule rule =
058 *       RuleChain.outerRule(connectionRule).around(clearUserNamespacesAndTablesRule);
059 *   }
060 * }
061 * </pre>
062 */
063public class ClearUserNamespacesAndTablesRule extends ExternalResource {
064  private static final Logger logger =
065    LoggerFactory.getLogger(ClearUserNamespacesAndTablesRule.class);
066
067  private final Supplier<AsyncConnection> connectionSupplier;
068  private AsyncAdmin admin;
069
070  public ClearUserNamespacesAndTablesRule(final Supplier<AsyncConnection> connectionSupplier) {
071    this.connectionSupplier = connectionSupplier;
072  }
073
074  @Override
075  protected void before() throws Throwable {
076    final AsyncConnection connection = Objects.requireNonNull(connectionSupplier.get());
077    admin = connection.getAdmin();
078
079    clearTablesAndNamespaces().join();
080  }
081
082  private CompletableFuture<Void> clearTablesAndNamespaces() {
083    return deleteUserTables().thenCompose(_void -> deleteUserNamespaces());
084  }
085
086  private CompletableFuture<Void> deleteUserTables() {
087    return listTableNames().thenApply(tableNames -> tableNames.stream()
088      .map(tableName -> disableIfEnabled(tableName).thenCompose(_void -> deleteTable(tableName)))
089      .toArray(CompletableFuture[]::new)).thenCompose(CompletableFuture::allOf);
090  }
091
092  private CompletableFuture<List<TableName>> listTableNames() {
093    return CompletableFuture.runAsync(() -> logger.trace("listing tables"))
094      .thenCompose(_void -> admin.listTableNames(false)).thenApply(tableNames -> {
095        if (logger.isTraceEnabled()) {
096          final StringJoiner joiner = new StringJoiner(", ", "[", "]");
097          tableNames.stream().map(TableName::getNameAsString).forEach(joiner::add);
098          logger.trace("found existing tables {}", joiner.toString());
099        }
100        return tableNames;
101      });
102  }
103
104  private CompletableFuture<Boolean> isTableEnabled(final TableName tableName) {
105    return admin.isTableEnabled(tableName).thenApply(isEnabled -> {
106      logger.trace("table {} is enabled.", tableName);
107      return isEnabled;
108    });
109  }
110
111  private CompletableFuture<Void> disableIfEnabled(final TableName tableName) {
112    return isTableEnabled(tableName).thenCompose(
113      isEnabled -> isEnabled ? disableTable(tableName) : CompletableFuture.completedFuture(null));
114  }
115
116  private CompletableFuture<Void> disableTable(final TableName tableName) {
117    return CompletableFuture.runAsync(() -> logger.trace("disabling enabled table {}", tableName))
118      .thenCompose(_void -> admin.disableTable(tableName));
119  }
120
121  private CompletableFuture<Void> deleteTable(final TableName tableName) {
122    return CompletableFuture.runAsync(() -> logger.trace("deleting disabled table {}", tableName))
123      .thenCompose(_void -> admin.deleteTable(tableName));
124  }
125
126  private CompletableFuture<List<String>> listUserNamespaces() {
127    return CompletableFuture.runAsync(() -> logger.trace("listing namespaces"))
128      .thenCompose(_void -> admin.listNamespaceDescriptors()).thenApply(namespaceDescriptors -> {
129        final StringJoiner joiner = new StringJoiner(", ", "[", "]");
130        final List<String> names = namespaceDescriptors.stream().map(NamespaceDescriptor::getName)
131          .peek(joiner::add).collect(Collectors.toList());
132        logger.trace("found existing namespaces {}", joiner);
133        return names;
134      })
135      .thenApply(namespaces -> namespaces.stream()
136        .filter(
137          namespace -> !Objects.equals(namespace, NamespaceDescriptor.SYSTEM_NAMESPACE.getName()))
138        .filter(
139          namespace -> !Objects.equals(namespace, NamespaceDescriptor.DEFAULT_NAMESPACE.getName()))
140        .collect(Collectors.toList()));
141  }
142
143  private CompletableFuture<Void> deleteNamespace(final String namespace) {
144    return CompletableFuture.runAsync(() -> logger.trace("deleting namespace {}", namespace))
145      .thenCompose(_void -> admin.deleteNamespace(namespace));
146  }
147
148  private CompletableFuture<Void> deleteUserNamespaces() {
149    return listUserNamespaces().thenCompose(namespaces -> CompletableFuture
150      .allOf(namespaces.stream().map(this::deleteNamespace).toArray(CompletableFuture[]::new)));
151  }
152}