001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.thrift;
019
020import static org.apache.hadoop.hbase.thrift.Constants.BIND_OPTION;
021import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_OPTION;
022import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_OPTION;
023import static org.apache.hadoop.hbase.thrift.Constants.INFOPORT_OPTION;
024import static org.apache.hadoop.hbase.thrift.Constants.PORT_OPTION;
025import static org.junit.Assert.assertEquals;
026import static org.junit.Assert.assertTrue;
027
028import java.io.IOException;
029import java.net.BindException;
030import java.net.InetAddress;
031import java.util.ArrayList;
032import java.util.Collection;
033import java.util.List;
034import java.util.function.Supplier;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.net.BoundSocketMaker;
038import org.apache.hadoop.hbase.testclassification.ClientTests;
039import org.apache.hadoop.hbase.testclassification.LargeTests;
040import org.apache.hadoop.hbase.thrift.generated.Hbase;
041import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
042import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
043import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
044import org.apache.hadoop.hbase.util.TableDescriptorChecker;
045import org.apache.hadoop.hbase.util.Threads;
046import org.apache.thrift.protocol.TBinaryProtocol;
047import org.apache.thrift.protocol.TCompactProtocol;
048import org.apache.thrift.protocol.TProtocol;
049import org.apache.thrift.server.TServer;
050import org.apache.thrift.transport.TSocket;
051import org.apache.thrift.transport.TTransport;
052import org.apache.thrift.transport.layered.TFramedTransport;
053import org.junit.AfterClass;
054import org.junit.BeforeClass;
055import org.junit.ClassRule;
056import org.junit.Test;
057import org.junit.experimental.categories.Category;
058import org.junit.runner.RunWith;
059import org.junit.runners.Parameterized;
060import org.junit.runners.Parameterized.Parameters;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
065
066/**
067 * Start the HBase Thrift server on a random port through the command-line interface and talk to it
068 * from client side.
069 */
070@Category({ ClientTests.class, LargeTests.class })
071@RunWith(Parameterized.class)
072public class TestThriftServerCmdLine {
073
074  @ClassRule
075  public static final HBaseClassTestRule CLASS_RULE =
076    HBaseClassTestRule.forClass(TestThriftServerCmdLine.class);
077
078  private static final Logger LOG = LoggerFactory.getLogger(TestThriftServerCmdLine.class);
079
080  protected final ImplType implType;
081  protected boolean specifyFramed;
082  protected boolean specifyBindIP;
083  protected boolean specifyCompact;
084
085  protected static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
086
087  @Parameters
088  public static Collection<Object[]> getParameters() {
089    Collection<Object[]> parameters = new ArrayList<>();
090    for (ImplType implType : ImplType.values()) {
091      for (boolean specifyFramed : new boolean[] { false, true }) {
092        for (boolean specifyBindIP : new boolean[] { false, true }) {
093          if (specifyBindIP && !implType.canSpecifyBindIP) {
094            continue;
095          }
096          for (boolean specifyCompact : new boolean[] { false, true }) {
097            parameters.add(new Object[] { implType, specifyFramed, specifyBindIP, specifyCompact });
098          }
099        }
100      }
101    }
102    return parameters;
103  }
104
105  public TestThriftServerCmdLine(ImplType implType, boolean specifyFramed, boolean specifyBindIP,
106    boolean specifyCompact) {
107    this.implType = implType;
108    this.specifyFramed = specifyFramed;
109    this.specifyBindIP = specifyBindIP;
110    this.specifyCompact = specifyCompact;
111    LOG.debug(getParametersString());
112  }
113
114  private String getParametersString() {
115    return "implType=" + implType + ", " + "specifyFramed=" + specifyFramed + ", "
116      + "specifyBindIP=" + specifyBindIP + ", " + "specifyCompact=" + specifyCompact;
117  }
118
119  @BeforeClass
120  public static void setUpBeforeClass() throws Exception {
121    TEST_UTIL.getConfiguration().setBoolean(TableDescriptorChecker.TABLE_SANITY_CHECKS, false);
122    TEST_UTIL.startMiniCluster();
123    // ensure that server time increments every time we do an operation, otherwise
124    // successive puts having the same timestamp will override each other
125    EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
126  }
127
128  @AfterClass
129  public static void tearDownAfterClass() throws Exception {
130    TEST_UTIL.shutdownMiniCluster();
131    EnvironmentEdgeManager.reset();
132  }
133
134  static ThriftServerRunner startCmdLineThread(Supplier<ThriftServer> supplier,
135    final String[] args) {
136    LOG.info("Starting HBase Thrift server with command line: " + Joiner.on(" ").join(args));
137    ThriftServerRunner tsr = new ThriftServerRunner(supplier.get(), args);
138    tsr.setName(ThriftServer.class.getSimpleName() + "-cmdline");
139    tsr.start();
140    return tsr;
141  }
142
143  static int getRandomPort() {
144    return HBaseTestingUtil.randomFreePort();
145  }
146
147  protected Supplier<ThriftServer> getThriftServerSupplier() {
148    return () -> new ThriftServer(TEST_UTIL.getConfiguration());
149  }
150
151  static ThriftServerRunner createBoundServer(Supplier<ThriftServer> thriftServerSupplier)
152    throws Exception {
153    return createBoundServer(thriftServerSupplier, false, false);
154  }
155
156  static ThriftServerRunner createBoundServer(Supplier<ThriftServer> thriftServerSupplier,
157    boolean protocolPortClash, boolean infoPortClash) throws Exception {
158    return createBoundServer(thriftServerSupplier, null, false, false, false, protocolPortClash,
159      infoPortClash);
160  }
161
162  static ThriftServerRunner createBoundServer(Supplier<ThriftServer> thriftServerSupplier,
163    ImplType implType, boolean specifyFramed, boolean specifyCompact, boolean specifyBindIP)
164    throws Exception {
165    return createBoundServer(thriftServerSupplier, implType, specifyFramed, specifyCompact,
166      specifyBindIP, false, false);
167  }
168
169  /**
170   * @param protocolPortClash This param is just so we can manufacture a port clash so we can test
171   *                          the code does the right thing when this happens during actual test
172   *                          runs. Ugly but works.
173   * @see TestBindExceptionHandling#testProtocolPortClash()
174   */
175  static ThriftServerRunner createBoundServer(Supplier<ThriftServer> thriftServerSupplier,
176    ImplType implType, boolean specifyFramed, boolean specifyCompact, boolean specifyBindIP,
177    boolean protocolPortClash, boolean infoPortClash) throws Exception {
178    if (protocolPortClash && infoPortClash) {
179      throw new RuntimeException("Can't set both at same time");
180    }
181    boolean testClashOfFirstProtocolPort = protocolPortClash;
182    boolean testClashOfFirstInfoPort = infoPortClash;
183    List<String> args = new ArrayList<>();
184    BoundSocketMaker bsm = null;
185    int port = -1;
186    ThriftServerRunner tsr = null;
187    for (int i = 0; i < 100; i++) {
188      args.clear();
189      if (implType != null) {
190        String serverTypeOption = implType.toString();
191        assertTrue(serverTypeOption.startsWith("-"));
192        args.add(serverTypeOption);
193      }
194      if (testClashOfFirstProtocolPort) {
195        // Test what happens if already something bound to the socket.
196        // Occupy the random port we just pulled.
197        bsm = new BoundSocketMaker(() -> getRandomPort());
198        port = bsm.getPort();
199        testClashOfFirstProtocolPort = false;
200      } else {
201        port = getRandomPort();
202      }
203      args.add("-" + PORT_OPTION);
204      args.add(String.valueOf(port));
205      args.add("-" + INFOPORT_OPTION);
206      int infoPort;
207      if (testClashOfFirstInfoPort) {
208        bsm = new BoundSocketMaker(() -> getRandomPort());
209        infoPort = bsm.getPort();
210        testClashOfFirstInfoPort = false;
211      } else {
212        infoPort = getRandomPort();
213      }
214      args.add(String.valueOf(infoPort));
215
216      if (specifyFramed) {
217        args.add("-" + FRAMED_OPTION);
218      }
219      if (specifyBindIP) {
220        args.add("-" + BIND_OPTION);
221        args.add(InetAddress.getLoopbackAddress().getHostName());
222      }
223      if (specifyCompact) {
224        args.add("-" + COMPACT_OPTION);
225      }
226      args.add("start");
227
228      tsr = startCmdLineThread(thriftServerSupplier, args.toArray(new String[args.size()]));
229      // wait up to 10s for the server to start
230      for (int ii = 0; ii < 100
231        && (tsr.getThriftServer().tserver == null && tsr.getRunException() == null); ii++) {
232        Threads.sleep(100);
233      }
234      if (isBindException(tsr.getRunException())) {
235        LOG.info("BindException; trying new port", tsr.getRunException());
236        try {
237          tsr.close();
238          tsr.join();
239        } catch (IOException | InterruptedException ioe) {
240          LOG.warn("Exception closing", ioe);
241        }
242        continue;
243      }
244      break;
245    }
246    if (bsm != null) {
247      try {
248        bsm.close();
249      } catch (IOException ioe) {
250        LOG.warn("Failed close", ioe);
251      }
252    }
253    if (tsr.getRunException() != null) {
254      throw tsr.getRunException();
255    }
256    if (tsr.getThriftServer().tserver != null) {
257      Class<? extends TServer> expectedClass =
258        implType != null ? implType.serverClass : TBoundedThreadPoolServer.class;
259      assertEquals(expectedClass, tsr.getThriftServer().tserver.getClass());
260    }
261    return tsr;
262  }
263
264  private static boolean isBindException(Exception cmdLineException) {
265    if (cmdLineException == null) {
266      return false;
267    }
268    if (cmdLineException instanceof BindException) {
269      return true;
270    }
271    if (
272      cmdLineException.getCause() != null && cmdLineException.getCause() instanceof BindException
273    ) {
274      return true;
275    }
276    return false;
277  }
278
279  @Test
280  public void testRunThriftServer() throws Exception {
281    // Add retries in case we see stuff like connection reset
282    Exception clientSideException = null;
283    for (int i = 0; i < 10; i++) {
284      clientSideException = null;
285      ThriftServerRunner thriftServerRunner = createBoundServer(getThriftServerSupplier(),
286        this.implType, this.specifyFramed, this.specifyCompact, this.specifyBindIP);
287      try {
288        talkToThriftServer(thriftServerRunner.getThriftServer().listenPort);
289        break;
290      } catch (Exception ex) {
291        clientSideException = ex;
292        LOG.info("Exception", ex);
293      } finally {
294        LOG.debug("Stopping " + this.implType.simpleClassName() + " Thrift server");
295        thriftServerRunner.close();
296        thriftServerRunner.join();
297        if (thriftServerRunner.getRunException() != null) {
298          LOG.error("Command-line invocation of HBase Thrift server threw exception",
299            thriftServerRunner.getRunException());
300          throw thriftServerRunner.getRunException();
301        }
302      }
303    }
304
305    if (clientSideException != null) {
306      LOG.error("Thrift Client; parameters={}", getParametersString(), clientSideException);
307      throw new Exception(clientSideException);
308    }
309  }
310
311  protected static volatile boolean tableCreated = false;
312
313  protected void talkToThriftServer(int port) throws Exception {
314    LOG.info("Talking to port={}", port);
315    TSocket sock = new TSocket(InetAddress.getLoopbackAddress().getHostName(), port);
316    TTransport transport = sock;
317    if (specifyFramed || implType.isAlwaysFramed) {
318      transport = new TFramedTransport(transport);
319    }
320
321    sock.open();
322    try {
323      TProtocol prot;
324      if (specifyCompact) {
325        prot = new TCompactProtocol(transport);
326      } else {
327        prot = new TBinaryProtocol(transport);
328      }
329
330      Hbase.Client client = new Hbase.Client(prot);
331      if (!tableCreated) {
332        TestThriftServer.createTestTables(client);
333        tableCreated = true;
334      }
335      TestThriftServer.checkTableList(client);
336
337    } finally {
338      sock.close();
339    }
340  }
341}