001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.util.List; 021import java.util.Objects; 022import java.util.StringJoiner; 023import java.util.concurrent.CompletableFuture; 024import java.util.function.Supplier; 025import java.util.stream.Collectors; 026import org.apache.hadoop.hbase.client.AsyncAdmin; 027import org.apache.hadoop.hbase.client.AsyncConnection; 028import org.junit.ClassRule; 029import org.junit.Rule; 030import org.junit.rules.ExternalResource; 031import org.junit.rules.TestRule; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035/** 036 * A {@link TestRule} that clears all user namespaces and tables {@link ExternalResource#before() 037 * before} the test executes. Can be used in either the {@link Rule} or {@link ClassRule} positions. 038 * Lazily realizes the provided {@link AsyncConnection} so as to avoid initialization races with 039 * other {@link Rule Rules}. <b>Does not</b> {@link AsyncConnection#close() close()} provided 040 * connection instance when finished. 041 * </p> 042 * Use in combination with {@link MiniClusterRule} and {@link ConnectionRule}, for example: 043 * 044 * <pre> 045 * { 046 * @code 047 * public class TestMyClass { 048 * @ClassRule 049 * public static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder().build(); 050 * 051 * private final ConnectionRule connectionRule = 052 * new ConnectionRule(miniClusterRule::createConnection); 053 * private final ClearUserNamespacesAndTablesRule clearUserNamespacesAndTablesRule = 054 * new ClearUserNamespacesAndTablesRule(connectionRule::getConnection); 055 * 056 * @Rule 057 * public TestRule rule = 058 * RuleChain.outerRule(connectionRule).around(clearUserNamespacesAndTablesRule); 059 * } 060 * } 061 * </pre> 062 */ 063public class ClearUserNamespacesAndTablesRule extends ExternalResource { 064 private static final Logger logger = 065 LoggerFactory.getLogger(ClearUserNamespacesAndTablesRule.class); 066 067 private final Supplier<AsyncConnection> connectionSupplier; 068 private AsyncAdmin admin; 069 070 public ClearUserNamespacesAndTablesRule(final Supplier<AsyncConnection> connectionSupplier) { 071 this.connectionSupplier = connectionSupplier; 072 } 073 074 @Override 075 protected void before() throws Throwable { 076 final AsyncConnection connection = Objects.requireNonNull(connectionSupplier.get()); 077 admin = connection.getAdmin(); 078 079 clearTablesAndNamespaces().join(); 080 } 081 082 private CompletableFuture<Void> clearTablesAndNamespaces() { 083 return deleteUserTables().thenCompose(_void -> deleteUserNamespaces()); 084 } 085 086 private CompletableFuture<Void> deleteUserTables() { 087 return listTableNames().thenApply(tableNames -> tableNames.stream() 088 .map(tableName -> disableIfEnabled(tableName).thenCompose(_void -> deleteTable(tableName))) 089 .toArray(CompletableFuture[]::new)).thenCompose(CompletableFuture::allOf); 090 } 091 092 private CompletableFuture<List<TableName>> listTableNames() { 093 return CompletableFuture.runAsync(() -> logger.trace("listing tables")) 094 .thenCompose(_void -> admin.listTableNames(false)).thenApply(tableNames -> { 095 if (logger.isTraceEnabled()) { 096 final StringJoiner joiner = new StringJoiner(", ", "[", "]"); 097 tableNames.stream().map(TableName::getNameAsString).forEach(joiner::add); 098 logger.trace("found existing tables {}", joiner.toString()); 099 } 100 return tableNames; 101 }); 102 } 103 104 private CompletableFuture<Boolean> isTableEnabled(final TableName tableName) { 105 return admin.isTableEnabled(tableName).thenApply(isEnabled -> { 106 logger.trace("table {} is enabled.", tableName); 107 return isEnabled; 108 }); 109 } 110 111 private CompletableFuture<Void> disableIfEnabled(final TableName tableName) { 112 return isTableEnabled(tableName).thenCompose( 113 isEnabled -> isEnabled ? disableTable(tableName) : CompletableFuture.completedFuture(null)); 114 } 115 116 private CompletableFuture<Void> disableTable(final TableName tableName) { 117 return CompletableFuture.runAsync(() -> logger.trace("disabling enabled table {}", tableName)) 118 .thenCompose(_void -> admin.disableTable(tableName)); 119 } 120 121 private CompletableFuture<Void> deleteTable(final TableName tableName) { 122 return CompletableFuture.runAsync(() -> logger.trace("deleting disabled table {}", tableName)) 123 .thenCompose(_void -> admin.deleteTable(tableName)); 124 } 125 126 private CompletableFuture<List<String>> listUserNamespaces() { 127 return CompletableFuture.runAsync(() -> logger.trace("listing namespaces")) 128 .thenCompose(_void -> admin.listNamespaceDescriptors()).thenApply(namespaceDescriptors -> { 129 final StringJoiner joiner = new StringJoiner(", ", "[", "]"); 130 final List<String> names = namespaceDescriptors.stream().map(NamespaceDescriptor::getName) 131 .peek(joiner::add).collect(Collectors.toList()); 132 logger.trace("found existing namespaces {}", joiner); 133 return names; 134 }) 135 .thenApply(namespaces -> namespaces.stream() 136 .filter( 137 namespace -> !Objects.equals(namespace, NamespaceDescriptor.SYSTEM_NAMESPACE.getName())) 138 .filter( 139 namespace -> !Objects.equals(namespace, NamespaceDescriptor.DEFAULT_NAMESPACE.getName())) 140 .collect(Collectors.toList())); 141 } 142 143 private CompletableFuture<Void> deleteNamespace(final String namespace) { 144 return CompletableFuture.runAsync(() -> logger.trace("deleting namespace {}", namespace)) 145 .thenCompose(_void -> admin.deleteNamespace(namespace)); 146 } 147 148 private CompletableFuture<Void> deleteUserNamespaces() { 149 return listUserNamespaces().thenCompose(namespaces -> CompletableFuture 150 .allOf(namespaces.stream().map(this::deleteNamespace).toArray(CompletableFuture[]::new))); 151 } 152}