001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.thrift; 019 020import static org.apache.hadoop.hbase.thrift.Constants.BIND_OPTION; 021import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_OPTION; 022import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_OPTION; 023import static org.apache.hadoop.hbase.thrift.Constants.INFOPORT_OPTION; 024import static org.apache.hadoop.hbase.thrift.Constants.PORT_OPTION; 025import static org.junit.Assert.assertEquals; 026import static org.junit.Assert.assertTrue; 027import java.io.IOException; 028import java.net.BindException; 029import java.net.InetAddress; 030import java.util.ArrayList; 031import java.util.Collection; 032import java.util.List; 033import java.util.function.Supplier; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseTestingUtility; 036import org.apache.hadoop.hbase.net.BoundSocketMaker; 037import org.apache.hadoop.hbase.testclassification.ClientTests; 038import org.apache.hadoop.hbase.testclassification.LargeTests; 039import org.apache.hadoop.hbase.thrift.generated.Hbase; 040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 041import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 042import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; 043import org.apache.hadoop.hbase.util.TableDescriptorChecker; 044import org.apache.hadoop.hbase.util.Threads; 045import org.apache.thrift.protocol.TBinaryProtocol; 046import org.apache.thrift.protocol.TCompactProtocol; 047import org.apache.thrift.protocol.TProtocol; 048import org.apache.thrift.server.TServer; 049import org.apache.thrift.transport.TFramedTransport; 050import org.apache.thrift.transport.TSocket; 051import org.apache.thrift.transport.TTransport; 052import org.junit.AfterClass; 053import org.junit.BeforeClass; 054import org.junit.ClassRule; 055import org.junit.Test; 056import org.junit.experimental.categories.Category; 057import org.junit.runner.RunWith; 058import org.junit.runners.Parameterized; 059import org.junit.runners.Parameterized.Parameters; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062import org.apache.hbase.thirdparty.com.google.common.base.Joiner; 063 064/** 065 * Start the HBase Thrift server on a random port through the command-line 066 * interface and talk to it from client side. 067 */ 068@Category({ClientTests.class, LargeTests.class}) 069@RunWith(Parameterized.class) 070public class TestThriftServerCmdLine { 071 072 @ClassRule 073 public static final HBaseClassTestRule CLASS_RULE = 074 HBaseClassTestRule.forClass(TestThriftServerCmdLine.class); 075 076 private static final Logger LOG = 077 LoggerFactory.getLogger(TestThriftServerCmdLine.class); 078 079 protected final ImplType implType; 080 protected boolean specifyFramed; 081 protected boolean specifyBindIP; 082 protected boolean specifyCompact; 083 084 protected static final HBaseTestingUtility TEST_UTIL = 085 new HBaseTestingUtility(); 086 087 @Parameters 088 public static Collection<Object[]> getParameters() { 089 Collection<Object[]> parameters = new ArrayList<>(); 090 for (ImplType implType : ImplType.values()) { 091 for (boolean specifyFramed : new boolean[] {false, true}) { 092 for (boolean specifyBindIP : new boolean[] {false, true}) { 093 if (specifyBindIP && !implType.canSpecifyBindIP) { 094 continue; 095 } 096 for (boolean specifyCompact : new boolean[] {false, true}) { 097 parameters.add(new Object[]{implType, specifyFramed, 098 specifyBindIP, specifyCompact}); 099 } 100 } 101 } 102 } 103 return parameters; 104 } 105 106 public TestThriftServerCmdLine(ImplType implType, boolean specifyFramed, 107 boolean specifyBindIP, boolean specifyCompact) { 108 this.implType = implType; 109 this.specifyFramed = specifyFramed; 110 this.specifyBindIP = specifyBindIP; 111 this.specifyCompact = specifyCompact; 112 LOG.debug(getParametersString()); 113 } 114 115 private String getParametersString() { 116 return "implType=" + implType + ", " + 117 "specifyFramed=" + specifyFramed + ", " + 118 "specifyBindIP=" + specifyBindIP + ", " + 119 "specifyCompact=" + specifyCompact; 120 } 121 122 @BeforeClass 123 public static void setUpBeforeClass() throws Exception { 124 TEST_UTIL.getConfiguration().setBoolean(TableDescriptorChecker.TABLE_SANITY_CHECKS, false); 125 TEST_UTIL.startMiniCluster(); 126 //ensure that server time increments every time we do an operation, otherwise 127 //successive puts having the same timestamp will override each other 128 EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge()); 129 } 130 131 @AfterClass 132 public static void tearDownAfterClass() throws Exception { 133 TEST_UTIL.shutdownMiniCluster(); 134 EnvironmentEdgeManager.reset(); 135 } 136 137 static ThriftServerRunner startCmdLineThread(Supplier<ThriftServer> supplier, 138 final String[] args) { 139 LOG.info("Starting HBase Thrift server with command line: " + Joiner.on(" ").join(args)); 140 ThriftServerRunner tsr = new ThriftServerRunner(supplier.get(), args); 141 tsr.setName(ThriftServer.class.getSimpleName() + "-cmdline"); 142 tsr.start(); 143 return tsr; 144 } 145 146 static int getRandomPort() { 147 return HBaseTestingUtility.randomFreePort(); 148 } 149 150 protected Supplier<ThriftServer> getThriftServerSupplier() { 151 return () -> new ThriftServer(TEST_UTIL.getConfiguration()); 152 } 153 154 static ThriftServerRunner createBoundServer(Supplier<ThriftServer> thriftServerSupplier) 155 throws Exception { 156 return createBoundServer(thriftServerSupplier, false, false); 157 } 158 159 static ThriftServerRunner createBoundServer(Supplier<ThriftServer> thriftServerSupplier, 160 boolean protocolPortClash, boolean infoPortClash) throws Exception { 161 return createBoundServer(thriftServerSupplier, null, false, false, 162 false, protocolPortClash, infoPortClash); 163 } 164 165 static ThriftServerRunner createBoundServer(Supplier<ThriftServer> thriftServerSupplier, 166 ImplType implType, boolean specifyFramed, boolean specifyCompact, boolean specifyBindIP) 167 throws Exception { 168 return createBoundServer(thriftServerSupplier, implType, specifyFramed, specifyCompact, 169 specifyBindIP, false, false); 170 } 171 172 /** 173 * @param protocolPortClash This param is just so we can manufacture a port clash so we can test 174 * the code does the right thing when this happens during actual test runs. Ugly but works. 175 * @see TestBindExceptionHandling#testProtocolPortClash() 176 */ 177 static ThriftServerRunner createBoundServer(Supplier<ThriftServer> thriftServerSupplier, 178 ImplType implType, boolean specifyFramed, boolean specifyCompact, boolean specifyBindIP, 179 boolean protocolPortClash, boolean infoPortClash) throws Exception { 180 if (protocolPortClash && infoPortClash) { 181 throw new RuntimeException("Can't set both at same time"); 182 } 183 boolean testClashOfFirstProtocolPort = protocolPortClash; 184 boolean testClashOfFirstInfoPort = infoPortClash; 185 List<String> args = new ArrayList<>(); 186 BoundSocketMaker bsm = null; 187 int port = -1; 188 ThriftServerRunner tsr = null; 189 for (int i = 0; i < 100; i++) { 190 args.clear(); 191 if (implType != null) { 192 String serverTypeOption = implType.toString(); 193 assertTrue(serverTypeOption.startsWith("-")); 194 args.add(serverTypeOption); 195 } 196 if (testClashOfFirstProtocolPort) { 197 // Test what happens if already something bound to the socket. 198 // Occupy the random port we just pulled. 199 bsm = new BoundSocketMaker(() -> getRandomPort()); 200 port = bsm.getPort(); 201 testClashOfFirstProtocolPort = false; 202 } else { 203 port = getRandomPort(); 204 } 205 args.add("-" + PORT_OPTION); 206 args.add(String.valueOf(port)); 207 args.add("-" + INFOPORT_OPTION); 208 int infoPort; 209 if (testClashOfFirstInfoPort) { 210 bsm = new BoundSocketMaker(() -> getRandomPort()); 211 infoPort = bsm.getPort(); 212 testClashOfFirstInfoPort = false; 213 } else { 214 infoPort = getRandomPort(); 215 } 216 args.add(String.valueOf(infoPort)); 217 218 if (specifyFramed) { 219 args.add("-" + FRAMED_OPTION); 220 } 221 if (specifyBindIP) { 222 args.add("-" + BIND_OPTION); 223 args.add(InetAddress.getLoopbackAddress().getHostName()); 224 } 225 if (specifyCompact) { 226 args.add("-" + COMPACT_OPTION); 227 } 228 args.add("start"); 229 230 tsr = startCmdLineThread(thriftServerSupplier, args.toArray(new String[args.size()])); 231 // wait up to 10s for the server to start 232 for (int ii = 0; ii < 100 && (tsr.getThriftServer().tserver == null && 233 tsr.getRunException() == null); ii++) { 234 Threads.sleep(100); 235 } 236 if (isBindException(tsr.getRunException())) { 237 LOG.info("BindException; trying new port", tsr.getRunException()); 238 try { 239 tsr.close(); 240 tsr.join(); 241 } catch (IOException | InterruptedException ioe) { 242 LOG.warn("Exception closing", ioe); 243 } 244 continue; 245 } 246 break; 247 } 248 if (bsm != null) { 249 try { 250 bsm.close(); 251 } catch (IOException ioe) { 252 LOG.warn("Failed close", ioe); 253 } 254 } 255 if (tsr.getRunException() != null) { 256 throw tsr.getRunException(); 257 } 258 if (tsr.getThriftServer().tserver != null) { 259 Class<? extends TServer> expectedClass = 260 implType != null ? implType.serverClass : TBoundedThreadPoolServer.class; 261 assertEquals(expectedClass, tsr.getThriftServer().tserver.getClass()); 262 } 263 return tsr; 264 } 265 266 private static boolean isBindException(Exception cmdLineException) { 267 if (cmdLineException == null) { 268 return false; 269 } 270 if (cmdLineException instanceof BindException) { 271 return true; 272 } 273 if (cmdLineException.getCause() != null && 274 cmdLineException.getCause() instanceof BindException) { 275 return true; 276 } 277 return false; 278 } 279 280 @Test 281 public void testRunThriftServer() throws Exception { 282 // Add retries in case we see stuff like connection reset 283 Exception clientSideException = null; 284 for (int i = 0; i < 10; i++) { 285 clientSideException = null; 286 ThriftServerRunner thriftServerRunner = createBoundServer(getThriftServerSupplier(), 287 this.implType, this.specifyFramed, this.specifyCompact, this.specifyBindIP); 288 try { 289 talkToThriftServer(thriftServerRunner.getThriftServer().listenPort); 290 break; 291 } catch (Exception ex) { 292 clientSideException = ex; 293 LOG.info("Exception", ex); 294 } finally { 295 LOG.debug("Stopping " + this.implType.simpleClassName() + " Thrift server"); 296 thriftServerRunner.close(); 297 thriftServerRunner.join(); 298 if (thriftServerRunner.getRunException() != null) { 299 LOG.error("Command-line invocation of HBase Thrift server threw exception", 300 thriftServerRunner.getRunException()); 301 throw thriftServerRunner.getRunException(); 302 } 303 } 304 } 305 306 if (clientSideException != null) { 307 LOG.error("Thrift Client; parameters={}", getParametersString(), clientSideException); 308 throw new Exception(clientSideException); 309 } 310 } 311 312 protected static volatile boolean tableCreated = false; 313 314 protected void talkToThriftServer(int port) throws Exception { 315 LOG.info("Talking to port={}", port); 316 TSocket sock = new TSocket(InetAddress.getLoopbackAddress().getHostName(), port); 317 TTransport transport = sock; 318 if (specifyFramed || implType.isAlwaysFramed) { 319 transport = new TFramedTransport(transport); 320 } 321 322 sock.open(); 323 try { 324 TProtocol prot; 325 if (specifyCompact) { 326 prot = new TCompactProtocol(transport); 327 } else { 328 prot = new TBinaryProtocol(transport); 329 } 330 331 Hbase.Client client = new Hbase.Client(prot); 332 if (!tableCreated){ 333 TestThriftServer.createTestTables(client); 334 tableCreated = true; 335 } 336 TestThriftServer.checkTableList(client); 337 338 } finally { 339 sock.close(); 340 } 341 } 342} 343