001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.util.List;
021import java.util.Objects;
022import java.util.StringJoiner;
023import java.util.concurrent.CompletableFuture;
024import java.util.function.Supplier;
025import java.util.stream.Collectors;
026import org.apache.hadoop.hbase.client.AsyncAdmin;
027import org.apache.hadoop.hbase.client.AsyncConnection;
028import org.junit.ClassRule;
029import org.junit.Rule;
030import org.junit.rules.ExternalResource;
031import org.junit.rules.TestRule;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035/**
036 * A {@link TestRule} that clears all user namespaces and tables
037 * {@link ExternalResource#before() before} the test executes. Can be used in either the
038 * {@link Rule} or {@link ClassRule} positions. Lazily realizes the provided
039 * {@link AsyncConnection} so as to avoid initialization races with other {@link Rule Rules}.
040 * <b>Does not</b> {@link AsyncConnection#close() close()} provided connection instance when
041 * finished.
042 * </p>
043 * Use in combination with {@link MiniClusterRule} and {@link ConnectionRule}, for example:
044 *
045 * <pre>{@code
046 *   public class TestMyClass {
047 *     @ClassRule
048 *     public static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder().build();
049 *
050 *     private final ConnectionRule connectionRule =
051 *       new ConnectionRule(miniClusterRule::createConnection);
052 *     private final ClearUserNamespacesAndTablesRule clearUserNamespacesAndTablesRule =
053 *       new ClearUserNamespacesAndTablesRule(connectionRule::getConnection);
054 *
055 *     @Rule
056 *     public TestRule rule = RuleChain
057 *       .outerRule(connectionRule)
058 *       .around(clearUserNamespacesAndTablesRule);
059 *   }
060 * }</pre>
061 */
062public class ClearUserNamespacesAndTablesRule extends ExternalResource {
063  private static final Logger logger =
064    LoggerFactory.getLogger(ClearUserNamespacesAndTablesRule.class);
065
066  private final Supplier<AsyncConnection> connectionSupplier;
067  private AsyncAdmin admin;
068
069  public ClearUserNamespacesAndTablesRule(final Supplier<AsyncConnection> connectionSupplier) {
070    this.connectionSupplier = connectionSupplier;
071  }
072
073  @Override
074  protected void before() throws Throwable {
075    final AsyncConnection connection = Objects.requireNonNull(connectionSupplier.get());
076    admin = connection.getAdmin();
077
078    clearTablesAndNamespaces().join();
079  }
080
081  private CompletableFuture<Void> clearTablesAndNamespaces() {
082    return deleteUserTables().thenCompose(_void -> deleteUserNamespaces());
083  }
084
085  private CompletableFuture<Void> deleteUserTables() {
086    return listTableNames()
087      .thenApply(tableNames -> tableNames.stream()
088        .map(tableName -> disableIfEnabled(tableName).thenCompose(_void -> deleteTable(tableName)))
089        .toArray(CompletableFuture[]::new))
090      .thenCompose(CompletableFuture::allOf);
091  }
092
093  private CompletableFuture<List<TableName>> listTableNames() {
094    return CompletableFuture
095      .runAsync(() -> logger.trace("listing tables"))
096      .thenCompose(_void -> admin.listTableNames(false))
097      .thenApply(tableNames -> {
098        if (logger.isTraceEnabled()) {
099          final StringJoiner joiner = new StringJoiner(", ", "[", "]");
100          tableNames.stream().map(TableName::getNameAsString).forEach(joiner::add);
101          logger.trace("found existing tables {}", joiner.toString());
102        }
103        return tableNames;
104      });
105  }
106
107  private CompletableFuture<Boolean> isTableEnabled(final TableName tableName) {
108    return admin.isTableEnabled(tableName)
109      .thenApply(isEnabled -> {
110        logger.trace("table {} is enabled.", tableName);
111        return isEnabled;
112      });
113  }
114
115  private CompletableFuture<Void> disableIfEnabled(final TableName tableName) {
116    return isTableEnabled(tableName)
117      .thenCompose(isEnabled -> isEnabled
118        ? disableTable(tableName)
119        : CompletableFuture.completedFuture(null));
120  }
121
122  private CompletableFuture<Void> disableTable(final TableName tableName) {
123    return CompletableFuture
124      .runAsync(() -> logger.trace("disabling enabled table {}", tableName))
125      .thenCompose(_void -> admin.disableTable(tableName));
126  }
127
128  private CompletableFuture<Void> deleteTable(final TableName tableName) {
129    return CompletableFuture
130      .runAsync(() -> logger.trace("deleting disabled table {}", tableName))
131      .thenCompose(_void -> admin.deleteTable(tableName));
132  }
133
134  private CompletableFuture<List<String>> listUserNamespaces() {
135    return CompletableFuture
136      .runAsync(() -> logger.trace("listing namespaces"))
137      .thenCompose(_void -> admin.listNamespaceDescriptors())
138      .thenApply(namespaceDescriptors -> {
139        final StringJoiner joiner = new StringJoiner(", ", "[", "]");
140        final List<String> names = namespaceDescriptors.stream()
141          .map(NamespaceDescriptor::getName)
142          .peek(joiner::add)
143          .collect(Collectors.toList());
144        logger.trace("found existing namespaces {}", joiner);
145        return names;
146      })
147      .thenApply(namespaces -> namespaces.stream()
148        .filter(namespace -> !Objects.equals(
149          namespace, NamespaceDescriptor.SYSTEM_NAMESPACE.getName()))
150        .filter(namespace -> !Objects.equals(
151          namespace, NamespaceDescriptor.DEFAULT_NAMESPACE.getName()))
152        .collect(Collectors.toList()));
153  }
154
155  private CompletableFuture<Void> deleteNamespace(final String namespace) {
156    return CompletableFuture
157      .runAsync(() -> logger.trace("deleting namespace {}", namespace))
158      .thenCompose(_void -> admin.deleteNamespace(namespace));
159  }
160
161  private CompletableFuture<Void> deleteUserNamespaces() {
162    return listUserNamespaces()
163      .thenCompose(namespaces -> CompletableFuture.allOf(namespaces.stream()
164        .map(this::deleteNamespace)
165        .toArray(CompletableFuture[]::new)));
166  }
167}