001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.thrift; 019 020import static org.apache.hadoop.hbase.thrift.Constants.BIND_OPTION; 021import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_OPTION; 022import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_OPTION; 023import static org.apache.hadoop.hbase.thrift.Constants.INFOPORT_OPTION; 024import static org.apache.hadoop.hbase.thrift.Constants.PORT_OPTION; 025import static org.junit.jupiter.api.Assertions.assertEquals; 026import static org.junit.jupiter.api.Assertions.assertTrue; 027 028import java.io.IOException; 029import java.net.BindException; 030import java.net.InetAddress; 031import java.util.ArrayList; 032import java.util.List; 033import java.util.function.Supplier; 034import java.util.stream.Stream; 035import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 036import org.apache.hadoop.hbase.HBaseTestingUtil; 037import org.apache.hadoop.hbase.net.BoundSocketMaker; 038import org.apache.hadoop.hbase.testclassification.ClientTests; 039import org.apache.hadoop.hbase.testclassification.LargeTests; 040import org.apache.hadoop.hbase.thrift.generated.Hbase; 041import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 042import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 043import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; 044import org.apache.hadoop.hbase.util.TableDescriptorChecker; 045import org.apache.hadoop.hbase.util.Threads; 046import org.apache.thrift.protocol.TBinaryProtocol; 047import org.apache.thrift.protocol.TCompactProtocol; 048import org.apache.thrift.protocol.TProtocol; 049import org.apache.thrift.server.TServer; 050import org.apache.thrift.transport.TSocket; 051import org.apache.thrift.transport.TTransport; 052import org.apache.thrift.transport.layered.TFramedTransport; 053import org.junit.jupiter.api.AfterAll; 054import org.junit.jupiter.api.BeforeAll; 055import org.junit.jupiter.api.Tag; 056import org.junit.jupiter.api.TestTemplate; 057import org.junit.jupiter.params.provider.Arguments; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061import org.apache.hbase.thirdparty.com.google.common.base.Joiner; 062 063/** 064 * Start the HBase Thrift server on a random port through the command-line interface and talk to it 065 * from client side. 066 */ 067@Tag(ClientTests.TAG) 068@Tag(LargeTests.TAG) 069@HBaseParameterizedTestTemplate 070public class TestThriftServerCmdLine { 071 072 private static final Logger LOG = LoggerFactory.getLogger(TestThriftServerCmdLine.class); 073 074 protected final ImplType implType; 075 protected boolean specifyFramed; 076 protected boolean specifyBindIP; 077 protected boolean specifyCompact; 078 079 protected static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 080 081 public static Stream<Arguments> parameters() { 082 List<Arguments> params = new ArrayList<>(); 083 for (ImplType implType : ImplType.values()) { 084 for (boolean specifyFramed : new boolean[] { false, true }) { 085 for (boolean specifyBindIP : new boolean[] { false, true }) { 086 if (specifyBindIP && !implType.canSpecifyBindIP) { 087 continue; 088 } 089 for (boolean specifyCompact : new boolean[] { false, true }) { 090 params.add(Arguments.of(implType, specifyFramed, specifyBindIP, specifyCompact)); 091 } 092 } 093 } 094 } 095 return params.stream(); 096 } 097 098 public TestThriftServerCmdLine(ImplType implType, boolean specifyFramed, boolean specifyBindIP, 099 boolean specifyCompact) { 100 this.implType = implType; 101 this.specifyFramed = specifyFramed; 102 this.specifyBindIP = specifyBindIP; 103 this.specifyCompact = specifyCompact; 104 LOG.debug(getParametersString()); 105 } 106 107 private String getParametersString() { 108 return "implType=" + implType + ", " + "specifyFramed=" + specifyFramed + ", " 109 + "specifyBindIP=" + specifyBindIP + ", " + "specifyCompact=" + specifyCompact; 110 } 111 112 @BeforeAll 113 public static void setUpBeforeClass() throws Exception { 114 TEST_UTIL.getConfiguration().setBoolean(TableDescriptorChecker.TABLE_SANITY_CHECKS, false); 115 TEST_UTIL.startMiniCluster(); 116 // ensure that server time increments every time we do an operation, otherwise 117 // successive puts having the same timestamp will override each other 118 EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge()); 119 } 120 121 @AfterAll 122 public static void tearDownAfterClass() throws Exception { 123 TEST_UTIL.shutdownMiniCluster(); 124 EnvironmentEdgeManager.reset(); 125 } 126 127 static ThriftServerRunner startCmdLineThread(Supplier<ThriftServer> supplier, 128 final String[] args) { 129 LOG.info("Starting HBase Thrift server with command line: " + Joiner.on(" ").join(args)); 130 ThriftServerRunner tsr = new ThriftServerRunner(supplier.get(), args); 131 tsr.setName(ThriftServer.class.getSimpleName() + "-cmdline"); 132 tsr.start(); 133 return tsr; 134 } 135 136 static int getRandomPort() { 137 return HBaseTestingUtil.randomFreePort(); 138 } 139 140 protected Supplier<ThriftServer> getThriftServerSupplier() { 141 return () -> new ThriftServer(TEST_UTIL.getConfiguration()); 142 } 143 144 static ThriftServerRunner createBoundServer(Supplier<ThriftServer> thriftServerSupplier) 145 throws Exception { 146 return createBoundServer(thriftServerSupplier, false, false); 147 } 148 149 static ThriftServerRunner createBoundServer(Supplier<ThriftServer> thriftServerSupplier, 150 boolean protocolPortClash, boolean infoPortClash) throws Exception { 151 return createBoundServer(thriftServerSupplier, null, false, false, false, protocolPortClash, 152 infoPortClash); 153 } 154 155 static ThriftServerRunner createBoundServer(Supplier<ThriftServer> thriftServerSupplier, 156 ImplType implType, boolean specifyFramed, boolean specifyCompact, boolean specifyBindIP) 157 throws Exception { 158 return createBoundServer(thriftServerSupplier, implType, specifyFramed, specifyCompact, 159 specifyBindIP, false, false); 160 } 161 162 /** 163 * @param protocolPortClash This param is just so we can manufacture a port clash so we can test 164 * the code does the right thing when this happens during actual test 165 * runs. Ugly but works. 166 * @see TestBindExceptionHandling#testProtocolPortClash() 167 */ 168 static ThriftServerRunner createBoundServer(Supplier<ThriftServer> thriftServerSupplier, 169 ImplType implType, boolean specifyFramed, boolean specifyCompact, boolean specifyBindIP, 170 boolean protocolPortClash, boolean infoPortClash) throws Exception { 171 if (protocolPortClash && infoPortClash) { 172 throw new RuntimeException("Can't set both at same time"); 173 } 174 boolean testClashOfFirstProtocolPort = protocolPortClash; 175 boolean testClashOfFirstInfoPort = infoPortClash; 176 List<String> args = new ArrayList<>(); 177 BoundSocketMaker bsm = null; 178 int port = -1; 179 ThriftServerRunner tsr = null; 180 for (int i = 0; i < 100; i++) { 181 args.clear(); 182 if (implType != null) { 183 String serverTypeOption = implType.toString(); 184 assertTrue(serverTypeOption.startsWith("-")); 185 args.add(serverTypeOption); 186 } 187 if (testClashOfFirstProtocolPort) { 188 // Test what happens if already something bound to the socket. 189 // Occupy the random port we just pulled. 190 bsm = new BoundSocketMaker(() -> getRandomPort()); 191 port = bsm.getPort(); 192 testClashOfFirstProtocolPort = false; 193 } else { 194 port = getRandomPort(); 195 } 196 args.add("-" + PORT_OPTION); 197 args.add(String.valueOf(port)); 198 args.add("-" + INFOPORT_OPTION); 199 int infoPort; 200 if (testClashOfFirstInfoPort) { 201 bsm = new BoundSocketMaker(() -> getRandomPort()); 202 infoPort = bsm.getPort(); 203 testClashOfFirstInfoPort = false; 204 } else { 205 infoPort = getRandomPort(); 206 } 207 args.add(String.valueOf(infoPort)); 208 209 if (specifyFramed) { 210 args.add("-" + FRAMED_OPTION); 211 } 212 if (specifyBindIP) { 213 args.add("-" + BIND_OPTION); 214 args.add(InetAddress.getLoopbackAddress().getHostName()); 215 } 216 if (specifyCompact) { 217 args.add("-" + COMPACT_OPTION); 218 } 219 args.add("start"); 220 221 tsr = startCmdLineThread(thriftServerSupplier, args.toArray(new String[args.size()])); 222 // wait up to 10s for the server to start 223 for (int ii = 0; ii < 100 224 && (tsr.getThriftServer().tserver == null && tsr.getRunException() == null); ii++) { 225 Threads.sleep(100); 226 } 227 if (isBindException(tsr.getRunException())) { 228 LOG.info("BindException; trying new port", tsr.getRunException()); 229 try { 230 tsr.close(); 231 tsr.join(); 232 } catch (IOException | InterruptedException ioe) { 233 LOG.warn("Exception closing", ioe); 234 } 235 continue; 236 } 237 break; 238 } 239 if (bsm != null) { 240 try { 241 bsm.close(); 242 } catch (IOException ioe) { 243 LOG.warn("Failed close", ioe); 244 } 245 } 246 if (tsr.getRunException() != null) { 247 throw tsr.getRunException(); 248 } 249 if (tsr.getThriftServer().tserver != null) { 250 Class<? extends TServer> expectedClass = 251 implType != null ? implType.serverClass : TBoundedThreadPoolServer.class; 252 assertEquals(expectedClass, tsr.getThriftServer().tserver.getClass()); 253 } 254 return tsr; 255 } 256 257 private static boolean isBindException(Exception cmdLineException) { 258 if (cmdLineException == null) { 259 return false; 260 } 261 if (cmdLineException instanceof BindException) { 262 return true; 263 } 264 if ( 265 cmdLineException.getCause() != null && cmdLineException.getCause() instanceof BindException 266 ) { 267 return true; 268 } 269 return false; 270 } 271 272 @TestTemplate 273 public void testRunThriftServer() throws Exception { 274 // Add retries in case we see stuff like connection reset 275 Exception clientSideException = null; 276 for (int i = 0; i < 10; i++) { 277 clientSideException = null; 278 ThriftServerRunner thriftServerRunner = createBoundServer(getThriftServerSupplier(), 279 this.implType, this.specifyFramed, this.specifyCompact, this.specifyBindIP); 280 try { 281 talkToThriftServer(thriftServerRunner.getThriftServer().listenPort); 282 break; 283 } catch (Exception ex) { 284 clientSideException = ex; 285 LOG.info("Exception", ex); 286 } finally { 287 LOG.debug("Stopping " + this.implType.simpleClassName() + " Thrift server"); 288 thriftServerRunner.close(); 289 thriftServerRunner.join(); 290 if (thriftServerRunner.getRunException() != null) { 291 LOG.error("Command-line invocation of HBase Thrift server threw exception", 292 thriftServerRunner.getRunException()); 293 throw thriftServerRunner.getRunException(); 294 } 295 } 296 } 297 298 if (clientSideException != null) { 299 LOG.error("Thrift Client; parameters={}", getParametersString(), clientSideException); 300 throw new Exception(clientSideException); 301 } 302 } 303 304 protected static volatile boolean tableCreated = false; 305 306 protected void talkToThriftServer(int port) throws Exception { 307 LOG.info("Talking to port={}", port); 308 TSocket sock = new TSocket(InetAddress.getLoopbackAddress().getHostName(), port); 309 TTransport transport = sock; 310 if (specifyFramed || implType.isAlwaysFramed) { 311 transport = new TFramedTransport(transport); 312 } 313 314 sock.open(); 315 try { 316 TProtocol prot; 317 if (specifyCompact) { 318 prot = new TCompactProtocol(transport); 319 } else { 320 prot = new TBinaryProtocol(transport); 321 } 322 323 Hbase.Client client = new Hbase.Client(prot); 324 if (!tableCreated) { 325 TestThriftServer.createTestTables(client); 326 tableCreated = true; 327 } 328 TestThriftServer.checkTableList(client); 329 330 } finally { 331 sock.close(); 332 } 333 } 334}