001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.thrift; 019 020import static org.apache.hadoop.hbase.thrift.Constants.BIND_OPTION; 021import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_OPTION; 022import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_OPTION; 023import static org.apache.hadoop.hbase.thrift.Constants.INFOPORT_OPTION; 024import static org.apache.hadoop.hbase.thrift.Constants.PORT_OPTION; 025import static org.junit.Assert.assertEquals; 026import static org.junit.Assert.assertTrue; 027 028import java.io.IOException; 029import java.net.BindException; 030import java.net.InetAddress; 031import java.util.ArrayList; 032import java.util.Collection; 033import java.util.List; 034import java.util.function.Supplier; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseTestingUtility; 037import org.apache.hadoop.hbase.net.BoundSocketMaker; 038import org.apache.hadoop.hbase.testclassification.ClientTests; 039import org.apache.hadoop.hbase.testclassification.LargeTests; 040import org.apache.hadoop.hbase.thrift.generated.Hbase; 041import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 042import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 043import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; 044import org.apache.hadoop.hbase.util.TableDescriptorChecker; 045import org.apache.hadoop.hbase.util.Threads; 046import org.apache.thrift.protocol.TBinaryProtocol; 047import org.apache.thrift.protocol.TCompactProtocol; 048import org.apache.thrift.protocol.TProtocol; 049import org.apache.thrift.server.TServer; 050import org.apache.thrift.transport.TSocket; 051import org.apache.thrift.transport.TTransport; 052import org.apache.thrift.transport.layered.TFramedTransport; 053import org.junit.AfterClass; 054import org.junit.BeforeClass; 055import org.junit.ClassRule; 056import org.junit.Test; 057import org.junit.experimental.categories.Category; 058import org.junit.runner.RunWith; 059import org.junit.runners.Parameterized; 060import org.junit.runners.Parameterized.Parameters; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064import org.apache.hbase.thirdparty.com.google.common.base.Joiner; 065 066/** 067 * Start the HBase Thrift server on a random port through the command-line interface and talk to it 068 * from client side. 069 */ 070@Category({ ClientTests.class, LargeTests.class }) 071@RunWith(Parameterized.class) 072public class TestThriftServerCmdLine { 073 074 @ClassRule 075 public static final HBaseClassTestRule CLASS_RULE = 076 HBaseClassTestRule.forClass(TestThriftServerCmdLine.class); 077 078 private static final Logger LOG = LoggerFactory.getLogger(TestThriftServerCmdLine.class); 079 080 protected final ImplType implType; 081 protected boolean specifyFramed; 082 protected boolean specifyBindIP; 083 protected boolean specifyCompact; 084 085 protected static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 086 087 @Parameters 088 public static Collection<Object[]> getParameters() { 089 Collection<Object[]> parameters = new ArrayList<>(); 090 for (ImplType implType : ImplType.values()) { 091 for (boolean specifyFramed : new boolean[] { false, true }) { 092 for (boolean specifyBindIP : new boolean[] { false, true }) { 093 if (specifyBindIP && !implType.canSpecifyBindIP) { 094 continue; 095 } 096 for (boolean specifyCompact : new boolean[] { false, true }) { 097 parameters.add(new Object[] { implType, specifyFramed, specifyBindIP, specifyCompact }); 098 } 099 } 100 } 101 } 102 return parameters; 103 } 104 105 public TestThriftServerCmdLine(ImplType implType, boolean specifyFramed, boolean specifyBindIP, 106 boolean specifyCompact) { 107 this.implType = implType; 108 this.specifyFramed = specifyFramed; 109 this.specifyBindIP = specifyBindIP; 110 this.specifyCompact = specifyCompact; 111 LOG.debug(getParametersString()); 112 } 113 114 private String getParametersString() { 115 return "implType=" + implType + ", " + "specifyFramed=" + specifyFramed + ", " 116 + "specifyBindIP=" + specifyBindIP + ", " + "specifyCompact=" + specifyCompact; 117 } 118 119 @BeforeClass 120 public static void setUpBeforeClass() throws Exception { 121 TEST_UTIL.getConfiguration().setBoolean(TableDescriptorChecker.TABLE_SANITY_CHECKS, false); 122 TEST_UTIL.startMiniCluster(); 123 // ensure that server time increments every time we do an operation, otherwise 124 // successive puts having the same timestamp will override each other 125 EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge()); 126 } 127 128 @AfterClass 129 public static void tearDownAfterClass() throws Exception { 130 TEST_UTIL.shutdownMiniCluster(); 131 EnvironmentEdgeManager.reset(); 132 } 133 134 static ThriftServerRunner startCmdLineThread(Supplier<ThriftServer> supplier, 135 final String[] args) { 136 LOG.info("Starting HBase Thrift server with command line: " + Joiner.on(" ").join(args)); 137 ThriftServerRunner tsr = new ThriftServerRunner(supplier.get(), args); 138 tsr.setName(ThriftServer.class.getSimpleName() + "-cmdline"); 139 tsr.start(); 140 return tsr; 141 } 142 143 static int getRandomPort() { 144 return HBaseTestingUtility.randomFreePort(); 145 } 146 147 protected Supplier<ThriftServer> getThriftServerSupplier() { 148 return () -> new ThriftServer(TEST_UTIL.getConfiguration()); 149 } 150 151 static ThriftServerRunner createBoundServer(Supplier<ThriftServer> thriftServerSupplier) 152 throws Exception { 153 return createBoundServer(thriftServerSupplier, false, false); 154 } 155 156 static ThriftServerRunner createBoundServer(Supplier<ThriftServer> thriftServerSupplier, 157 boolean protocolPortClash, boolean infoPortClash) throws Exception { 158 return createBoundServer(thriftServerSupplier, null, false, false, false, protocolPortClash, 159 infoPortClash); 160 } 161 162 static ThriftServerRunner createBoundServer(Supplier<ThriftServer> thriftServerSupplier, 163 ImplType implType, boolean specifyFramed, boolean specifyCompact, boolean specifyBindIP) 164 throws Exception { 165 return createBoundServer(thriftServerSupplier, implType, specifyFramed, specifyCompact, 166 specifyBindIP, false, false); 167 } 168 169 /** 170 * @param protocolPortClash This param is just so we can manufacture a port clash so we can test 171 * the code does the right thing when this happens during actual test 172 * runs. Ugly but works. 173 * @see TestBindExceptionHandling#testProtocolPortClash() 174 */ 175 static ThriftServerRunner createBoundServer(Supplier<ThriftServer> thriftServerSupplier, 176 ImplType implType, boolean specifyFramed, boolean specifyCompact, boolean specifyBindIP, 177 boolean protocolPortClash, boolean infoPortClash) throws Exception { 178 if (protocolPortClash && infoPortClash) { 179 throw new RuntimeException("Can't set both at same time"); 180 } 181 boolean testClashOfFirstProtocolPort = protocolPortClash; 182 boolean testClashOfFirstInfoPort = infoPortClash; 183 List<String> args = new ArrayList<>(); 184 BoundSocketMaker bsm = null; 185 int port = -1; 186 ThriftServerRunner tsr = null; 187 for (int i = 0; i < 100; i++) { 188 args.clear(); 189 if (implType != null) { 190 String serverTypeOption = implType.toString(); 191 assertTrue(serverTypeOption.startsWith("-")); 192 args.add(serverTypeOption); 193 } 194 if (testClashOfFirstProtocolPort) { 195 // Test what happens if already something bound to the socket. 196 // Occupy the random port we just pulled. 197 bsm = new BoundSocketMaker(() -> getRandomPort()); 198 port = bsm.getPort(); 199 testClashOfFirstProtocolPort = false; 200 } else { 201 port = getRandomPort(); 202 } 203 args.add("-" + PORT_OPTION); 204 args.add(String.valueOf(port)); 205 args.add("-" + INFOPORT_OPTION); 206 int infoPort; 207 if (testClashOfFirstInfoPort) { 208 bsm = new BoundSocketMaker(() -> getRandomPort()); 209 infoPort = bsm.getPort(); 210 testClashOfFirstInfoPort = false; 211 } else { 212 infoPort = getRandomPort(); 213 } 214 args.add(String.valueOf(infoPort)); 215 216 if (specifyFramed) { 217 args.add("-" + FRAMED_OPTION); 218 } 219 if (specifyBindIP) { 220 args.add("-" + BIND_OPTION); 221 args.add(InetAddress.getLoopbackAddress().getHostName()); 222 } 223 if (specifyCompact) { 224 args.add("-" + COMPACT_OPTION); 225 } 226 args.add("start"); 227 228 tsr = startCmdLineThread(thriftServerSupplier, args.toArray(new String[args.size()])); 229 // wait up to 10s for the server to start 230 for (int ii = 0; ii < 100 231 && (tsr.getThriftServer().tserver == null && tsr.getRunException() == null); ii++) { 232 Threads.sleep(100); 233 } 234 if (isBindException(tsr.getRunException())) { 235 LOG.info("BindException; trying new port", tsr.getRunException()); 236 try { 237 tsr.close(); 238 tsr.join(); 239 } catch (IOException | InterruptedException ioe) { 240 LOG.warn("Exception closing", ioe); 241 } 242 continue; 243 } 244 break; 245 } 246 if (bsm != null) { 247 try { 248 bsm.close(); 249 } catch (IOException ioe) { 250 LOG.warn("Failed close", ioe); 251 } 252 } 253 if (tsr.getRunException() != null) { 254 throw tsr.getRunException(); 255 } 256 if (tsr.getThriftServer().tserver != null) { 257 Class<? extends TServer> expectedClass = 258 implType != null ? implType.serverClass : TBoundedThreadPoolServer.class; 259 assertEquals(expectedClass, tsr.getThriftServer().tserver.getClass()); 260 } 261 return tsr; 262 } 263 264 private static boolean isBindException(Exception cmdLineException) { 265 if (cmdLineException == null) { 266 return false; 267 } 268 if (cmdLineException instanceof BindException) { 269 return true; 270 } 271 if ( 272 cmdLineException.getCause() != null && cmdLineException.getCause() instanceof BindException 273 ) { 274 return true; 275 } 276 return false; 277 } 278 279 @Test 280 public void testRunThriftServer() throws Exception { 281 // Add retries in case we see stuff like connection reset 282 Exception clientSideException = null; 283 for (int i = 0; i < 10; i++) { 284 clientSideException = null; 285 ThriftServerRunner thriftServerRunner = createBoundServer(getThriftServerSupplier(), 286 this.implType, this.specifyFramed, this.specifyCompact, this.specifyBindIP); 287 try { 288 talkToThriftServer(thriftServerRunner.getThriftServer().listenPort); 289 break; 290 } catch (Exception ex) { 291 clientSideException = ex; 292 LOG.info("Exception", ex); 293 } finally { 294 LOG.debug("Stopping " + this.implType.simpleClassName() + " Thrift server"); 295 thriftServerRunner.close(); 296 thriftServerRunner.join(); 297 if (thriftServerRunner.getRunException() != null) { 298 LOG.error("Command-line invocation of HBase Thrift server threw exception", 299 thriftServerRunner.getRunException()); 300 throw thriftServerRunner.getRunException(); 301 } 302 } 303 } 304 305 if (clientSideException != null) { 306 LOG.error("Thrift Client; parameters={}", getParametersString(), clientSideException); 307 throw new Exception(clientSideException); 308 } 309 } 310 311 protected static volatile boolean tableCreated = false; 312 313 protected void talkToThriftServer(int port) throws Exception { 314 LOG.info("Talking to port={}", port); 315 TSocket sock = new TSocket(InetAddress.getLoopbackAddress().getHostName(), port); 316 TTransport transport = sock; 317 if (specifyFramed || implType.isAlwaysFramed) { 318 transport = new TFramedTransport(transport); 319 } 320 321 sock.open(); 322 try { 323 TProtocol prot; 324 if (specifyCompact) { 325 prot = new TCompactProtocol(transport); 326 } else { 327 prot = new TBinaryProtocol(transport); 328 } 329 330 Hbase.Client client = new Hbase.Client(prot); 331 if (!tableCreated) { 332 TestThriftServer.createTestTables(client); 333 tableCreated = true; 334 } 335 TestThriftServer.checkTableList(client); 336 337 } finally { 338 sock.close(); 339 } 340 } 341}