001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.thrift;
019
020import static org.apache.hadoop.hbase.thrift.Constants.BIND_OPTION;
021import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_OPTION;
022import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_OPTION;
023import static org.apache.hadoop.hbase.thrift.Constants.INFOPORT_OPTION;
024import static org.apache.hadoop.hbase.thrift.Constants.PORT_OPTION;
025import static org.junit.Assert.assertEquals;
026import static org.junit.Assert.assertTrue;
027import java.io.IOException;
028import java.net.BindException;
029import java.net.InetAddress;
030import java.util.ArrayList;
031import java.util.Collection;
032import java.util.List;
033import java.util.function.Supplier;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtility;
036import org.apache.hadoop.hbase.net.BoundSocketMaker;
037import org.apache.hadoop.hbase.testclassification.ClientTests;
038import org.apache.hadoop.hbase.testclassification.LargeTests;
039import org.apache.hadoop.hbase.thrift.generated.Hbase;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
042import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
043import org.apache.hadoop.hbase.util.TableDescriptorChecker;
044import org.apache.hadoop.hbase.util.Threads;
045import org.apache.thrift.protocol.TBinaryProtocol;
046import org.apache.thrift.protocol.TCompactProtocol;
047import org.apache.thrift.protocol.TProtocol;
048import org.apache.thrift.server.TServer;
049import org.apache.thrift.transport.TFramedTransport;
050import org.apache.thrift.transport.TSocket;
051import org.apache.thrift.transport.TTransport;
052import org.junit.AfterClass;
053import org.junit.BeforeClass;
054import org.junit.ClassRule;
055import org.junit.Test;
056import org.junit.experimental.categories.Category;
057import org.junit.runner.RunWith;
058import org.junit.runners.Parameterized;
059import org.junit.runners.Parameterized.Parameters;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
063
064/**
065 * Start the HBase Thrift server on a random port through the command-line
066 * interface and talk to it from client side.
067 */
068@Category({ClientTests.class, LargeTests.class})
069@RunWith(Parameterized.class)
070public class TestThriftServerCmdLine {
071
072  @ClassRule
073  public static final HBaseClassTestRule CLASS_RULE =
074      HBaseClassTestRule.forClass(TestThriftServerCmdLine.class);
075
076  private static final Logger LOG =
077      LoggerFactory.getLogger(TestThriftServerCmdLine.class);
078
079  protected final ImplType implType;
080  protected boolean specifyFramed;
081  protected boolean specifyBindIP;
082  protected boolean specifyCompact;
083
084  protected static final HBaseTestingUtility TEST_UTIL =
085      new HBaseTestingUtility();
086
087  @Parameters
088  public static Collection<Object[]> getParameters() {
089    Collection<Object[]> parameters = new ArrayList<>();
090    for (ImplType implType : ImplType.values()) {
091      for (boolean specifyFramed : new boolean[] {false, true}) {
092        for (boolean specifyBindIP : new boolean[] {false, true}) {
093          if (specifyBindIP && !implType.canSpecifyBindIP) {
094            continue;
095          }
096          for (boolean specifyCompact : new boolean[] {false, true}) {
097            parameters.add(new Object[]{implType, specifyFramed,
098              specifyBindIP, specifyCompact});
099          }
100        }
101      }
102    }
103    return parameters;
104  }
105
106  public TestThriftServerCmdLine(ImplType implType, boolean specifyFramed,
107      boolean specifyBindIP, boolean specifyCompact) {
108    this.implType = implType;
109    this.specifyFramed = specifyFramed;
110    this.specifyBindIP = specifyBindIP;
111    this.specifyCompact = specifyCompact;
112    LOG.debug(getParametersString());
113  }
114
115  private String getParametersString() {
116    return "implType=" + implType + ", " +
117        "specifyFramed=" + specifyFramed + ", " +
118        "specifyBindIP=" + specifyBindIP + ", " +
119        "specifyCompact=" + specifyCompact;
120  }
121
122  @BeforeClass
123  public static void setUpBeforeClass() throws Exception {
124    TEST_UTIL.getConfiguration().setBoolean(TableDescriptorChecker.TABLE_SANITY_CHECKS, false);
125    TEST_UTIL.startMiniCluster();
126    //ensure that server time increments every time we do an operation, otherwise
127    //successive puts having the same timestamp will override each other
128    EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
129  }
130
131  @AfterClass
132  public static void tearDownAfterClass() throws Exception {
133    TEST_UTIL.shutdownMiniCluster();
134    EnvironmentEdgeManager.reset();
135  }
136
137  static ThriftServerRunner startCmdLineThread(Supplier<ThriftServer> supplier,
138      final String[] args) {
139    LOG.info("Starting HBase Thrift server with command line: " + Joiner.on(" ").join(args));
140    ThriftServerRunner tsr = new ThriftServerRunner(supplier.get(), args);
141    tsr.setName(ThriftServer.class.getSimpleName() + "-cmdline");
142    tsr.start();
143    return tsr;
144  }
145
146  static int getRandomPort() {
147    return HBaseTestingUtility.randomFreePort();
148  }
149
150  protected Supplier<ThriftServer> getThriftServerSupplier() {
151    return () -> new ThriftServer(TEST_UTIL.getConfiguration());
152  }
153
154  static ThriftServerRunner createBoundServer(Supplier<ThriftServer> thriftServerSupplier)
155      throws Exception {
156    return createBoundServer(thriftServerSupplier, false, false);
157  }
158
159  static ThriftServerRunner createBoundServer(Supplier<ThriftServer> thriftServerSupplier,
160      boolean protocolPortClash, boolean infoPortClash) throws Exception {
161    return createBoundServer(thriftServerSupplier, null, false, false,
162      false, protocolPortClash, infoPortClash);
163  }
164
165  static ThriftServerRunner createBoundServer(Supplier<ThriftServer> thriftServerSupplier,
166      ImplType implType, boolean specifyFramed, boolean specifyCompact, boolean specifyBindIP)
167      throws Exception {
168    return createBoundServer(thriftServerSupplier, implType, specifyFramed, specifyCompact,
169      specifyBindIP, false, false);
170  }
171
172  /**
173   * @param protocolPortClash This param is just so we can manufacture a port clash so we can test
174   *   the code does the right thing when this happens during actual test runs. Ugly but works.
175   * @see TestBindExceptionHandling#testProtocolPortClash()
176   */
177  static ThriftServerRunner createBoundServer(Supplier<ThriftServer> thriftServerSupplier,
178      ImplType implType, boolean specifyFramed, boolean specifyCompact, boolean specifyBindIP,
179      boolean protocolPortClash, boolean infoPortClash) throws Exception {
180    if (protocolPortClash && infoPortClash) {
181      throw new RuntimeException("Can't set both at same time");
182    }
183    boolean testClashOfFirstProtocolPort = protocolPortClash;
184    boolean testClashOfFirstInfoPort = infoPortClash;
185    List<String> args = new ArrayList<>();
186    BoundSocketMaker bsm = null;
187    int port = -1;
188    ThriftServerRunner tsr = null;
189    for (int i = 0; i < 100; i++) {
190      args.clear();
191      if (implType != null) {
192        String serverTypeOption = implType.toString();
193        assertTrue(serverTypeOption.startsWith("-"));
194        args.add(serverTypeOption);
195      }
196      if (testClashOfFirstProtocolPort) {
197        // Test what happens if already something bound to the socket.
198        // Occupy the random port we just pulled.
199        bsm = new BoundSocketMaker(() -> getRandomPort());
200        port = bsm.getPort();
201        testClashOfFirstProtocolPort = false;
202      } else {
203        port = getRandomPort();
204      }
205      args.add("-" + PORT_OPTION);
206      args.add(String.valueOf(port));
207      args.add("-" + INFOPORT_OPTION);
208      int infoPort;
209      if (testClashOfFirstInfoPort) {
210        bsm = new BoundSocketMaker(() -> getRandomPort());
211        infoPort = bsm.getPort();
212        testClashOfFirstInfoPort = false;
213      } else {
214        infoPort = getRandomPort();
215      }
216      args.add(String.valueOf(infoPort));
217
218      if (specifyFramed) {
219        args.add("-" + FRAMED_OPTION);
220      }
221      if (specifyBindIP) {
222        args.add("-" + BIND_OPTION);
223        args.add(InetAddress.getLoopbackAddress().getHostName());
224      }
225      if (specifyCompact) {
226        args.add("-" + COMPACT_OPTION);
227      }
228      args.add("start");
229
230      tsr = startCmdLineThread(thriftServerSupplier, args.toArray(new String[args.size()]));
231      // wait up to 10s for the server to start
232      for (int ii = 0; ii < 100 && (tsr.getThriftServer().tserver == null &&
233          tsr.getRunException() == null); ii++) {
234        Threads.sleep(100);
235      }
236      if (isBindException(tsr.getRunException())) {
237        LOG.info("BindException; trying new port", tsr.getRunException());
238        try {
239          tsr.close();
240          tsr.join();
241        } catch (IOException | InterruptedException ioe) {
242          LOG.warn("Exception closing", ioe);
243        }
244        continue;
245      }
246      break;
247    }
248    if (bsm != null) {
249      try {
250        bsm.close();
251      } catch (IOException ioe) {
252        LOG.warn("Failed close", ioe);
253      }
254    }
255    if (tsr.getRunException() != null) {
256      throw tsr.getRunException();
257    }
258    if (tsr.getThriftServer().tserver != null) {
259      Class<? extends TServer> expectedClass =
260        implType != null ? implType.serverClass : TBoundedThreadPoolServer.class;
261      assertEquals(expectedClass, tsr.getThriftServer().tserver.getClass());
262    }
263    return tsr;
264  }
265
266  private static boolean isBindException(Exception cmdLineException) {
267    if (cmdLineException == null) {
268      return false;
269    }
270    if (cmdLineException instanceof BindException) {
271      return true;
272    }
273    if (cmdLineException.getCause() != null &&
274        cmdLineException.getCause() instanceof BindException) {
275      return true;
276    }
277    return false;
278  }
279
280  @Test
281  public void testRunThriftServer() throws Exception {
282    // Add retries in case we see stuff like connection reset
283    Exception clientSideException =  null;
284    for (int i = 0; i < 10; i++) {
285      clientSideException =  null;
286      ThriftServerRunner thriftServerRunner = createBoundServer(getThriftServerSupplier(),
287        this.implType, this.specifyFramed, this.specifyCompact, this.specifyBindIP);
288      try {
289        talkToThriftServer(thriftServerRunner.getThriftServer().listenPort);
290        break;
291      } catch (Exception ex) {
292        clientSideException = ex;
293        LOG.info("Exception", ex);
294      } finally {
295        LOG.debug("Stopping " + this.implType.simpleClassName() + " Thrift server");
296        thriftServerRunner.close();
297        thriftServerRunner.join();
298        if (thriftServerRunner.getRunException() != null) {
299          LOG.error("Command-line invocation of HBase Thrift server threw exception",
300            thriftServerRunner.getRunException());
301          throw thriftServerRunner.getRunException();
302        }
303      }
304    }
305
306    if (clientSideException != null) {
307      LOG.error("Thrift Client; parameters={}", getParametersString(), clientSideException);
308      throw new Exception(clientSideException);
309    }
310  }
311
312  protected static volatile boolean tableCreated = false;
313
314  protected void talkToThriftServer(int port) throws Exception {
315    LOG.info("Talking to port={}", port);
316    TSocket sock = new TSocket(InetAddress.getLoopbackAddress().getHostName(), port);
317    TTransport transport = sock;
318    if (specifyFramed || implType.isAlwaysFramed) {
319      transport = new TFramedTransport(transport);
320    }
321
322    sock.open();
323    try {
324      TProtocol prot;
325      if (specifyCompact) {
326        prot = new TCompactProtocol(transport);
327      } else {
328        prot = new TBinaryProtocol(transport);
329      }
330
331      Hbase.Client client = new Hbase.Client(prot);
332      if (!tableCreated){
333        TestThriftServer.createTestTables(client);
334        tableCreated = true;
335      }
336      TestThriftServer.checkTableList(client);
337
338    } finally {
339      sock.close();
340    }
341  }
342}
343