001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.IOException; 021import java.util.concurrent.CompletableFuture; 022import java.util.function.Supplier; 023import org.apache.hadoop.hbase.client.AsyncConnection; 024import org.apache.hadoop.hbase.client.Connection; 025import org.junit.ClassRule; 026import org.junit.Rule; 027import org.junit.rules.ExternalResource; 028 029/** 030 * A {@link Rule} that manages the lifecycle of an instance of {@link AsyncConnection}. Can be used 031 * in either the {@link Rule} or {@link ClassRule} positions. 032 * </p> 033 * Use in combination with {@link MiniClusterRule}, for example: 034 * 035 * <pre> 036 * { 037 * @code 038 * public class TestMyClass { 039 * private static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder().build(); 040 * private static final ConnectionRule connectionRule = 041 * ConnectionRule.createAsyncConnectionRule(miniClusterRule::createConnection); 042 * 043 * @ClassRule 044 * public static final TestRule rule = 045 * RuleChain.outerRule(miniClusterRule).around(connectionRule); 046 * } 047 * } 048 * </pre> 049 */ 050public final class ConnectionRule extends ExternalResource { 051 052 private final Supplier<Connection> connectionSupplier; 053 private final Supplier<CompletableFuture<AsyncConnection>> asyncConnectionSupplier; 054 055 private Connection connection; 056 private AsyncConnection asyncConnection; 057 058 public static ConnectionRule createConnectionRule(final Supplier<Connection> connectionSupplier) { 059 return new ConnectionRule(connectionSupplier, null); 060 } 061 062 public static ConnectionRule createAsyncConnectionRule( 063 final Supplier<CompletableFuture<AsyncConnection>> asyncConnectionSupplier) { 064 return new ConnectionRule(null, asyncConnectionSupplier); 065 } 066 067 public static ConnectionRule createConnectionRule(final Supplier<Connection> connectionSupplier, 068 final Supplier<CompletableFuture<AsyncConnection>> asyncConnectionSupplier) { 069 return new ConnectionRule(connectionSupplier, asyncConnectionSupplier); 070 } 071 072 private ConnectionRule(final Supplier<Connection> connectionSupplier, 073 final Supplier<CompletableFuture<AsyncConnection>> asyncConnectionSupplier) { 074 this.connectionSupplier = connectionSupplier; 075 this.asyncConnectionSupplier = asyncConnectionSupplier; 076 } 077 078 public Connection getConnection() { 079 if (connection == null) { 080 throw new IllegalStateException( 081 "ConnectionRule not initialized with a synchronous connection."); 082 } 083 return connection; 084 } 085 086 public AsyncConnection getAsyncConnection() { 087 if (asyncConnection == null) { 088 throw new IllegalStateException( 089 "ConnectionRule not initialized with an asynchronous connection."); 090 } 091 return asyncConnection; 092 } 093 094 @Override 095 protected void before() throws Throwable { 096 if (connectionSupplier != null) { 097 this.connection = connectionSupplier.get(); 098 } 099 if (asyncConnectionSupplier != null) { 100 this.asyncConnection = asyncConnectionSupplier.get().join(); 101 } 102 if (connection == null && asyncConnection != null) { 103 this.connection = asyncConnection.toConnection(); 104 } 105 } 106 107 @Override 108 protected void after() { 109 CompletableFuture<Void> closeConnection = CompletableFuture.runAsync(() -> { 110 if (this.connection != null) { 111 try { 112 connection.close(); 113 } catch (IOException e) { 114 throw new RuntimeException(e); 115 } 116 } 117 }); 118 CompletableFuture<Void> closeAsyncConnection = CompletableFuture.runAsync(() -> { 119 if (this.asyncConnection != null) { 120 try { 121 asyncConnection.close(); 122 } catch (IOException e) { 123 throw new RuntimeException(e); 124 } 125 } 126 }); 127 CompletableFuture.allOf(closeConnection, closeAsyncConnection).join(); 128 } 129}