001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.util.List; 021import java.util.Objects; 022import java.util.StringJoiner; 023import java.util.concurrent.CompletableFuture; 024import java.util.function.Supplier; 025import java.util.stream.Collectors; 026import org.apache.hadoop.hbase.client.AsyncAdmin; 027import org.apache.hadoop.hbase.client.AsyncConnection; 028import org.junit.ClassRule; 029import org.junit.Rule; 030import org.junit.rules.ExternalResource; 031import org.junit.rules.TestRule; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035/** 036 * A {@link TestRule} that clears all user namespaces and tables 037 * {@link ExternalResource#before() before} the test executes. Can be used in either the 038 * {@link Rule} or {@link ClassRule} positions. Lazily realizes the provided 039 * {@link AsyncConnection} so as to avoid initialization races with other {@link Rule Rules}. 040 * <b>Does not</b> {@link AsyncConnection#close() close()} provided connection instance when 041 * finished. 042 * </p> 043 * Use in combination with {@link MiniClusterRule} and {@link ConnectionRule}, for example: 044 * 045 * <pre>{@code 046 * public class TestMyClass { 047 * @ClassRule 048 * public static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder().build(); 049 * 050 * private final ConnectionRule connectionRule = 051 * new ConnectionRule(miniClusterRule::createConnection); 052 * private final ClearUserNamespacesAndTablesRule clearUserNamespacesAndTablesRule = 053 * new ClearUserNamespacesAndTablesRule(connectionRule::getConnection); 054 * 055 * @Rule 056 * public TestRule rule = RuleChain 057 * .outerRule(connectionRule) 058 * .around(clearUserNamespacesAndTablesRule); 059 * } 060 * }</pre> 061 */ 062public class ClearUserNamespacesAndTablesRule extends ExternalResource { 063 private static final Logger logger = 064 LoggerFactory.getLogger(ClearUserNamespacesAndTablesRule.class); 065 066 private final Supplier<AsyncConnection> connectionSupplier; 067 private AsyncAdmin admin; 068 069 public ClearUserNamespacesAndTablesRule(final Supplier<AsyncConnection> connectionSupplier) { 070 this.connectionSupplier = connectionSupplier; 071 } 072 073 @Override 074 protected void before() throws Throwable { 075 final AsyncConnection connection = Objects.requireNonNull(connectionSupplier.get()); 076 admin = connection.getAdmin(); 077 078 clearTablesAndNamespaces().join(); 079 } 080 081 private CompletableFuture<Void> clearTablesAndNamespaces() { 082 return deleteUserTables().thenCompose(_void -> deleteUserNamespaces()); 083 } 084 085 private CompletableFuture<Void> deleteUserTables() { 086 return listTableNames() 087 .thenApply(tableNames -> tableNames.stream() 088 .map(tableName -> disableIfEnabled(tableName).thenCompose(_void -> deleteTable(tableName))) 089 .toArray(CompletableFuture[]::new)) 090 .thenCompose(CompletableFuture::allOf); 091 } 092 093 private CompletableFuture<List<TableName>> listTableNames() { 094 return CompletableFuture 095 .runAsync(() -> logger.trace("listing tables")) 096 .thenCompose(_void -> admin.listTableNames(false)) 097 .thenApply(tableNames -> { 098 if (logger.isTraceEnabled()) { 099 final StringJoiner joiner = new StringJoiner(", ", "[", "]"); 100 tableNames.stream().map(TableName::getNameAsString).forEach(joiner::add); 101 logger.trace("found existing tables {}", joiner.toString()); 102 } 103 return tableNames; 104 }); 105 } 106 107 private CompletableFuture<Boolean> isTableEnabled(final TableName tableName) { 108 return admin.isTableEnabled(tableName) 109 .thenApply(isEnabled -> { 110 logger.trace("table {} is enabled.", tableName); 111 return isEnabled; 112 }); 113 } 114 115 private CompletableFuture<Void> disableIfEnabled(final TableName tableName) { 116 return isTableEnabled(tableName) 117 .thenCompose(isEnabled -> isEnabled 118 ? disableTable(tableName) 119 : CompletableFuture.completedFuture(null)); 120 } 121 122 private CompletableFuture<Void> disableTable(final TableName tableName) { 123 return CompletableFuture 124 .runAsync(() -> logger.trace("disabling enabled table {}", tableName)) 125 .thenCompose(_void -> admin.disableTable(tableName)); 126 } 127 128 private CompletableFuture<Void> deleteTable(final TableName tableName) { 129 return CompletableFuture 130 .runAsync(() -> logger.trace("deleting disabled table {}", tableName)) 131 .thenCompose(_void -> admin.deleteTable(tableName)); 132 } 133 134 private CompletableFuture<List<String>> listUserNamespaces() { 135 return CompletableFuture 136 .runAsync(() -> logger.trace("listing namespaces")) 137 .thenCompose(_void -> admin.listNamespaceDescriptors()) 138 .thenApply(namespaceDescriptors -> { 139 final StringJoiner joiner = new StringJoiner(", ", "[", "]"); 140 final List<String> names = namespaceDescriptors.stream() 141 .map(NamespaceDescriptor::getName) 142 .peek(joiner::add) 143 .collect(Collectors.toList()); 144 logger.trace("found existing namespaces {}", joiner); 145 return names; 146 }) 147 .thenApply(namespaces -> namespaces.stream() 148 .filter(namespace -> !Objects.equals( 149 namespace, NamespaceDescriptor.SYSTEM_NAMESPACE.getName())) 150 .filter(namespace -> !Objects.equals( 151 namespace, NamespaceDescriptor.DEFAULT_NAMESPACE.getName())) 152 .collect(Collectors.toList())); 153 } 154 155 private CompletableFuture<Void> deleteNamespace(final String namespace) { 156 return CompletableFuture 157 .runAsync(() -> logger.trace("deleting namespace {}", namespace)) 158 .thenCompose(_void -> admin.deleteNamespace(namespace)); 159 } 160 161 private CompletableFuture<Void> deleteUserNamespaces() { 162 return listUserNamespaces() 163 .thenCompose(namespaces -> CompletableFuture.allOf(namespaces.stream() 164 .map(this::deleteNamespace) 165 .toArray(CompletableFuture[]::new))); 166 } 167}