001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.thrift;
019
020import static org.apache.hadoop.hbase.thrift.Constants.BIND_OPTION;
021import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_OPTION;
022import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_OPTION;
023import static org.apache.hadoop.hbase.thrift.Constants.INFOPORT_OPTION;
024import static org.apache.hadoop.hbase.thrift.Constants.PORT_OPTION;
025import static org.junit.jupiter.api.Assertions.assertEquals;
026import static org.junit.jupiter.api.Assertions.assertTrue;
027
028import java.io.IOException;
029import java.net.BindException;
030import java.net.InetAddress;
031import java.util.ArrayList;
032import java.util.List;
033import java.util.function.Supplier;
034import java.util.stream.Stream;
035import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.net.BoundSocketMaker;
038import org.apache.hadoop.hbase.testclassification.ClientTests;
039import org.apache.hadoop.hbase.testclassification.LargeTests;
040import org.apache.hadoop.hbase.thrift.generated.Hbase;
041import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
042import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
043import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
044import org.apache.hadoop.hbase.util.TableDescriptorChecker;
045import org.apache.hadoop.hbase.util.Threads;
046import org.apache.thrift.protocol.TBinaryProtocol;
047import org.apache.thrift.protocol.TCompactProtocol;
048import org.apache.thrift.protocol.TProtocol;
049import org.apache.thrift.server.TServer;
050import org.apache.thrift.transport.TSocket;
051import org.apache.thrift.transport.TTransport;
052import org.apache.thrift.transport.layered.TFramedTransport;
053import org.junit.jupiter.api.AfterAll;
054import org.junit.jupiter.api.BeforeAll;
055import org.junit.jupiter.api.Tag;
056import org.junit.jupiter.api.TestTemplate;
057import org.junit.jupiter.params.provider.Arguments;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
062
063/**
064 * Start the HBase Thrift server on a random port through the command-line interface and talk to it
065 * from client side.
066 */
067@Tag(ClientTests.TAG)
068@Tag(LargeTests.TAG)
069@HBaseParameterizedTestTemplate
070public class TestThriftServerCmdLine {
071
072  private static final Logger LOG = LoggerFactory.getLogger(TestThriftServerCmdLine.class);
073
074  protected final ImplType implType;
075  protected boolean specifyFramed;
076  protected boolean specifyBindIP;
077  protected boolean specifyCompact;
078
079  protected static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
080
081  public static Stream<Arguments> parameters() {
082    List<Arguments> params = new ArrayList<>();
083    for (ImplType implType : ImplType.values()) {
084      for (boolean specifyFramed : new boolean[] { false, true }) {
085        for (boolean specifyBindIP : new boolean[] { false, true }) {
086          if (specifyBindIP && !implType.canSpecifyBindIP) {
087            continue;
088          }
089          for (boolean specifyCompact : new boolean[] { false, true }) {
090            params.add(Arguments.of(implType, specifyFramed, specifyBindIP, specifyCompact));
091          }
092        }
093      }
094    }
095    return params.stream();
096  }
097
098  public TestThriftServerCmdLine(ImplType implType, boolean specifyFramed, boolean specifyBindIP,
099    boolean specifyCompact) {
100    this.implType = implType;
101    this.specifyFramed = specifyFramed;
102    this.specifyBindIP = specifyBindIP;
103    this.specifyCompact = specifyCompact;
104    LOG.debug(getParametersString());
105  }
106
107  private String getParametersString() {
108    return "implType=" + implType + ", " + "specifyFramed=" + specifyFramed + ", "
109      + "specifyBindIP=" + specifyBindIP + ", " + "specifyCompact=" + specifyCompact;
110  }
111
112  @BeforeAll
113  public static void setUpBeforeClass() throws Exception {
114    TEST_UTIL.getConfiguration().setBoolean(TableDescriptorChecker.TABLE_SANITY_CHECKS, false);
115    TEST_UTIL.startMiniCluster();
116    // ensure that server time increments every time we do an operation, otherwise
117    // successive puts having the same timestamp will override each other
118    EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
119  }
120
121  @AfterAll
122  public static void tearDownAfterClass() throws Exception {
123    TEST_UTIL.shutdownMiniCluster();
124    EnvironmentEdgeManager.reset();
125  }
126
127  static ThriftServerRunner startCmdLineThread(Supplier<ThriftServer> supplier,
128    final String[] args) {
129    LOG.info("Starting HBase Thrift server with command line: " + Joiner.on(" ").join(args));
130    ThriftServerRunner tsr = new ThriftServerRunner(supplier.get(), args);
131    tsr.setName(ThriftServer.class.getSimpleName() + "-cmdline");
132    tsr.start();
133    return tsr;
134  }
135
136  static int getRandomPort() {
137    return HBaseTestingUtil.randomFreePort();
138  }
139
140  protected Supplier<ThriftServer> getThriftServerSupplier() {
141    return () -> new ThriftServer(TEST_UTIL.getConfiguration());
142  }
143
144  static ThriftServerRunner createBoundServer(Supplier<ThriftServer> thriftServerSupplier)
145    throws Exception {
146    return createBoundServer(thriftServerSupplier, false, false);
147  }
148
149  static ThriftServerRunner createBoundServer(Supplier<ThriftServer> thriftServerSupplier,
150    boolean protocolPortClash, boolean infoPortClash) throws Exception {
151    return createBoundServer(thriftServerSupplier, null, false, false, false, protocolPortClash,
152      infoPortClash);
153  }
154
155  static ThriftServerRunner createBoundServer(Supplier<ThriftServer> thriftServerSupplier,
156    ImplType implType, boolean specifyFramed, boolean specifyCompact, boolean specifyBindIP)
157    throws Exception {
158    return createBoundServer(thriftServerSupplier, implType, specifyFramed, specifyCompact,
159      specifyBindIP, false, false);
160  }
161
162  /**
163   * @param protocolPortClash This param is just so we can manufacture a port clash so we can test
164   *                          the code does the right thing when this happens during actual test
165   *                          runs. Ugly but works.
166   * @see TestBindExceptionHandling#testProtocolPortClash()
167   */
168  static ThriftServerRunner createBoundServer(Supplier<ThriftServer> thriftServerSupplier,
169    ImplType implType, boolean specifyFramed, boolean specifyCompact, boolean specifyBindIP,
170    boolean protocolPortClash, boolean infoPortClash) throws Exception {
171    if (protocolPortClash && infoPortClash) {
172      throw new RuntimeException("Can't set both at same time");
173    }
174    boolean testClashOfFirstProtocolPort = protocolPortClash;
175    boolean testClashOfFirstInfoPort = infoPortClash;
176    List<String> args = new ArrayList<>();
177    BoundSocketMaker bsm = null;
178    int port = -1;
179    ThriftServerRunner tsr = null;
180    for (int i = 0; i < 100; i++) {
181      args.clear();
182      if (implType != null) {
183        String serverTypeOption = implType.toString();
184        assertTrue(serverTypeOption.startsWith("-"));
185        args.add(serverTypeOption);
186      }
187      if (testClashOfFirstProtocolPort) {
188        // Test what happens if already something bound to the socket.
189        // Occupy the random port we just pulled.
190        bsm = new BoundSocketMaker(() -> getRandomPort());
191        port = bsm.getPort();
192        testClashOfFirstProtocolPort = false;
193      } else {
194        port = getRandomPort();
195      }
196      args.add("-" + PORT_OPTION);
197      args.add(String.valueOf(port));
198      args.add("-" + INFOPORT_OPTION);
199      int infoPort;
200      if (testClashOfFirstInfoPort) {
201        bsm = new BoundSocketMaker(() -> getRandomPort());
202        infoPort = bsm.getPort();
203        testClashOfFirstInfoPort = false;
204      } else {
205        infoPort = getRandomPort();
206      }
207      args.add(String.valueOf(infoPort));
208
209      if (specifyFramed) {
210        args.add("-" + FRAMED_OPTION);
211      }
212      if (specifyBindIP) {
213        args.add("-" + BIND_OPTION);
214        args.add(InetAddress.getLoopbackAddress().getHostName());
215      }
216      if (specifyCompact) {
217        args.add("-" + COMPACT_OPTION);
218      }
219      args.add("start");
220
221      tsr = startCmdLineThread(thriftServerSupplier, args.toArray(new String[args.size()]));
222      // wait up to 10s for the server to start
223      for (int ii = 0; ii < 100
224        && (tsr.getThriftServer().tserver == null && tsr.getRunException() == null); ii++) {
225        Threads.sleep(100);
226      }
227      if (isBindException(tsr.getRunException())) {
228        LOG.info("BindException; trying new port", tsr.getRunException());
229        try {
230          tsr.close();
231          tsr.join();
232        } catch (IOException | InterruptedException ioe) {
233          LOG.warn("Exception closing", ioe);
234        }
235        continue;
236      }
237      break;
238    }
239    if (bsm != null) {
240      try {
241        bsm.close();
242      } catch (IOException ioe) {
243        LOG.warn("Failed close", ioe);
244      }
245    }
246    if (tsr.getRunException() != null) {
247      throw tsr.getRunException();
248    }
249    if (tsr.getThriftServer().tserver != null) {
250      Class<? extends TServer> expectedClass =
251        implType != null ? implType.serverClass : TBoundedThreadPoolServer.class;
252      assertEquals(expectedClass, tsr.getThriftServer().tserver.getClass());
253    }
254    return tsr;
255  }
256
257  private static boolean isBindException(Exception cmdLineException) {
258    if (cmdLineException == null) {
259      return false;
260    }
261    if (cmdLineException instanceof BindException) {
262      return true;
263    }
264    if (
265      cmdLineException.getCause() != null && cmdLineException.getCause() instanceof BindException
266    ) {
267      return true;
268    }
269    return false;
270  }
271
272  @TestTemplate
273  public void testRunThriftServer() throws Exception {
274    // Add retries in case we see stuff like connection reset
275    Exception clientSideException = null;
276    for (int i = 0; i < 10; i++) {
277      clientSideException = null;
278      ThriftServerRunner thriftServerRunner = createBoundServer(getThriftServerSupplier(),
279        this.implType, this.specifyFramed, this.specifyCompact, this.specifyBindIP);
280      try {
281        talkToThriftServer(thriftServerRunner.getThriftServer().listenPort);
282        break;
283      } catch (Exception ex) {
284        clientSideException = ex;
285        LOG.info("Exception", ex);
286      } finally {
287        LOG.debug("Stopping " + this.implType.simpleClassName() + " Thrift server");
288        thriftServerRunner.close();
289        thriftServerRunner.join();
290        if (thriftServerRunner.getRunException() != null) {
291          LOG.error("Command-line invocation of HBase Thrift server threw exception",
292            thriftServerRunner.getRunException());
293          throw thriftServerRunner.getRunException();
294        }
295      }
296    }
297
298    if (clientSideException != null) {
299      LOG.error("Thrift Client; parameters={}", getParametersString(), clientSideException);
300      throw new Exception(clientSideException);
301    }
302  }
303
304  protected static volatile boolean tableCreated = false;
305
306  protected void talkToThriftServer(int port) throws Exception {
307    LOG.info("Talking to port={}", port);
308    TSocket sock = new TSocket(InetAddress.getLoopbackAddress().getHostName(), port);
309    TTransport transport = sock;
310    if (specifyFramed || implType.isAlwaysFramed) {
311      transport = new TFramedTransport(transport);
312    }
313
314    sock.open();
315    try {
316      TProtocol prot;
317      if (specifyCompact) {
318        prot = new TCompactProtocol(transport);
319      } else {
320        prot = new TBinaryProtocol(transport);
321      }
322
323      Hbase.Client client = new Hbase.Client(prot);
324      if (!tableCreated) {
325        TestThriftServer.createTestTables(client);
326        tableCreated = true;
327      }
328      TestThriftServer.checkTableList(client);
329
330    } finally {
331      sock.close();
332    }
333  }
334}