001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.thrift;
019
020import static org.apache.hadoop.hbase.thrift.Constants.BIND_OPTION;
021import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_OPTION;
022import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_OPTION;
023import static org.apache.hadoop.hbase.thrift.Constants.INFOPORT_OPTION;
024import static org.apache.hadoop.hbase.thrift.Constants.PORT_OPTION;
025import static org.junit.Assert.assertEquals;
026import static org.junit.Assert.assertTrue;
027import java.io.IOException;
028import java.net.BindException;
029import java.net.InetAddress;
030import java.util.ArrayList;
031import java.util.Collection;
032import java.util.List;
033import java.util.function.Supplier;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.net.BoundSocketMaker;
037import org.apache.hadoop.hbase.testclassification.ClientTests;
038import org.apache.hadoop.hbase.testclassification.LargeTests;
039import org.apache.hadoop.hbase.thrift.generated.Hbase;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
042import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
043import org.apache.hadoop.hbase.util.TableDescriptorChecker;
044import org.apache.hadoop.hbase.util.Threads;
045import org.apache.thrift.protocol.TBinaryProtocol;
046import org.apache.thrift.protocol.TCompactProtocol;
047import org.apache.thrift.protocol.TProtocol;
048import org.apache.thrift.server.TServer;
049import org.apache.thrift.transport.TSocket;
050import org.apache.thrift.transport.TTransport;
051import org.apache.thrift.transport.layered.TFramedTransport;
052import org.junit.AfterClass;
053import org.junit.BeforeClass;
054import org.junit.ClassRule;
055import org.junit.Test;
056import org.junit.experimental.categories.Category;
057import org.junit.runner.RunWith;
058import org.junit.runners.Parameterized;
059import org.junit.runners.Parameterized.Parameters;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
063
064/**
065 * Start the HBase Thrift server on a random port through the command-line
066 * interface and talk to it from client side.
067 */
068@Category({ClientTests.class, LargeTests.class})
069@RunWith(Parameterized.class)
070public class TestThriftServerCmdLine {
071
072  @ClassRule
073  public static final HBaseClassTestRule CLASS_RULE =
074      HBaseClassTestRule.forClass(TestThriftServerCmdLine.class);
075
076  private static final Logger LOG =
077      LoggerFactory.getLogger(TestThriftServerCmdLine.class);
078
079  protected final ImplType implType;
080  protected boolean specifyFramed;
081  protected boolean specifyBindIP;
082  protected boolean specifyCompact;
083
084  protected static final HBaseTestingUtil TEST_UTIL =
085      new HBaseTestingUtil();
086
087  @Parameters
088  public static Collection<Object[]> getParameters() {
089    Collection<Object[]> parameters = new ArrayList<>();
090    for (ImplType implType : ImplType.values()) {
091      for (boolean specifyFramed : new boolean[] {false, true}) {
092        for (boolean specifyBindIP : new boolean[] {false, true}) {
093          if (specifyBindIP && !implType.canSpecifyBindIP) {
094            continue;
095          }
096          for (boolean specifyCompact : new boolean[] {false, true}) {
097            parameters.add(new Object[] {
098              implType, specifyFramed, specifyBindIP, specifyCompact
099            });
100          }
101        }
102      }
103    }
104    return parameters;
105  }
106
107  public TestThriftServerCmdLine(ImplType implType, boolean specifyFramed,
108      boolean specifyBindIP, boolean specifyCompact) {
109    this.implType = implType;
110    this.specifyFramed = specifyFramed;
111    this.specifyBindIP = specifyBindIP;
112    this.specifyCompact = specifyCompact;
113    LOG.debug(getParametersString());
114  }
115
116  private String getParametersString() {
117    return "implType=" + implType + ", " +
118        "specifyFramed=" + specifyFramed + ", " +
119        "specifyBindIP=" + specifyBindIP + ", " +
120        "specifyCompact=" + specifyCompact;
121  }
122
123  @BeforeClass
124  public static void setUpBeforeClass() throws Exception {
125    TEST_UTIL.getConfiguration().setBoolean(TableDescriptorChecker.TABLE_SANITY_CHECKS, false);
126    TEST_UTIL.startMiniCluster();
127    //ensure that server time increments every time we do an operation, otherwise
128    //successive puts having the same timestamp will override each other
129    EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
130  }
131
132  @AfterClass
133  public static void tearDownAfterClass() throws Exception {
134    TEST_UTIL.shutdownMiniCluster();
135    EnvironmentEdgeManager.reset();
136  }
137
138  static ThriftServerRunner startCmdLineThread(Supplier<ThriftServer> supplier,
139      final String[] args) {
140    LOG.info("Starting HBase Thrift server with command line: " + Joiner.on(" ").join(args));
141    ThriftServerRunner tsr = new ThriftServerRunner(supplier.get(), args);
142    tsr.setName(ThriftServer.class.getSimpleName() + "-cmdline");
143    tsr.start();
144    return tsr;
145  }
146
147  static int getRandomPort() {
148    return HBaseTestingUtil.randomFreePort();
149  }
150
151  protected Supplier<ThriftServer> getThriftServerSupplier() {
152    return () -> new ThriftServer(TEST_UTIL.getConfiguration());
153  }
154
155  static ThriftServerRunner createBoundServer(Supplier<ThriftServer> thriftServerSupplier)
156      throws Exception {
157    return createBoundServer(thriftServerSupplier, false, false);
158  }
159
160  static ThriftServerRunner createBoundServer(Supplier<ThriftServer> thriftServerSupplier,
161      boolean protocolPortClash, boolean infoPortClash) throws Exception {
162    return createBoundServer(thriftServerSupplier, null, false, false,
163      false, protocolPortClash, infoPortClash);
164  }
165
166  static ThriftServerRunner createBoundServer(Supplier<ThriftServer> thriftServerSupplier,
167      ImplType implType, boolean specifyFramed, boolean specifyCompact, boolean specifyBindIP)
168      throws Exception {
169    return createBoundServer(thriftServerSupplier, implType, specifyFramed, specifyCompact,
170      specifyBindIP, false, false);
171  }
172
173  /**
174   * @param protocolPortClash This param is just so we can manufacture a port clash so we can test
175   *   the code does the right thing when this happens during actual test runs. Ugly but works.
176   * @see TestBindExceptionHandling#testProtocolPortClash()
177   */
178  static ThriftServerRunner createBoundServer(Supplier<ThriftServer> thriftServerSupplier,
179      ImplType implType, boolean specifyFramed, boolean specifyCompact, boolean specifyBindIP,
180      boolean protocolPortClash, boolean infoPortClash) throws Exception {
181    if (protocolPortClash && infoPortClash) {
182      throw new RuntimeException("Can't set both at same time");
183    }
184    boolean testClashOfFirstProtocolPort = protocolPortClash;
185    boolean testClashOfFirstInfoPort = infoPortClash;
186    List<String> args = new ArrayList<>();
187    BoundSocketMaker bsm = null;
188    int port = -1;
189    ThriftServerRunner tsr = null;
190    for (int i = 0; i < 100; i++) {
191      args.clear();
192      if (implType != null) {
193        String serverTypeOption = implType.toString();
194        assertTrue(serverTypeOption.startsWith("-"));
195        args.add(serverTypeOption);
196      }
197      if (testClashOfFirstProtocolPort) {
198        // Test what happens if already something bound to the socket.
199        // Occupy the random port we just pulled.
200        bsm = new BoundSocketMaker(() -> getRandomPort());
201        port = bsm.getPort();
202        testClashOfFirstProtocolPort = false;
203      } else {
204        port = getRandomPort();
205      }
206      args.add("-" + PORT_OPTION);
207      args.add(String.valueOf(port));
208      args.add("-" + INFOPORT_OPTION);
209      int infoPort;
210      if (testClashOfFirstInfoPort) {
211        bsm = new BoundSocketMaker(() -> getRandomPort());
212        infoPort = bsm.getPort();
213        testClashOfFirstInfoPort = false;
214      } else {
215        infoPort = getRandomPort();
216      }
217      args.add(String.valueOf(infoPort));
218
219      if (specifyFramed) {
220        args.add("-" + FRAMED_OPTION);
221      }
222      if (specifyBindIP) {
223        args.add("-" + BIND_OPTION);
224        args.add(InetAddress.getLoopbackAddress().getHostName());
225      }
226      if (specifyCompact) {
227        args.add("-" + COMPACT_OPTION);
228      }
229      args.add("start");
230
231      tsr = startCmdLineThread(thriftServerSupplier, args.toArray(new String[args.size()]));
232      // wait up to 10s for the server to start
233      for (int ii = 0; ii < 100 && (tsr.getThriftServer().tserver == null &&
234          tsr.getRunException() == null); ii++) {
235        Threads.sleep(100);
236      }
237      if (isBindException(tsr.getRunException())) {
238        LOG.info("BindException; trying new port", tsr.getRunException());
239        try {
240          tsr.close();
241          tsr.join();
242        } catch (IOException | InterruptedException ioe) {
243          LOG.warn("Exception closing", ioe);
244        }
245        continue;
246      }
247      break;
248    }
249    if (bsm != null) {
250      try {
251        bsm.close();
252      } catch (IOException ioe) {
253        LOG.warn("Failed close", ioe);
254      }
255    }
256    if (tsr.getRunException() != null) {
257      throw tsr.getRunException();
258    }
259    if (tsr.getThriftServer().tserver != null) {
260      Class<? extends TServer> expectedClass =
261        implType != null ? implType.serverClass : TBoundedThreadPoolServer.class;
262      assertEquals(expectedClass, tsr.getThriftServer().tserver.getClass());
263    }
264    return tsr;
265  }
266
267  private static boolean isBindException(Exception cmdLineException) {
268    if (cmdLineException == null) {
269      return false;
270    }
271    if (cmdLineException instanceof BindException) {
272      return true;
273    }
274    if (cmdLineException.getCause() != null &&
275        cmdLineException.getCause() instanceof BindException) {
276      return true;
277    }
278    return false;
279  }
280
281  @Test
282  public void testRunThriftServer() throws Exception {
283    // Add retries in case we see stuff like connection reset
284    Exception clientSideException =  null;
285    for (int i = 0; i < 10; i++) {
286      clientSideException =  null;
287      ThriftServerRunner thriftServerRunner = createBoundServer(getThriftServerSupplier(),
288        this.implType, this.specifyFramed, this.specifyCompact, this.specifyBindIP);
289      try {
290        talkToThriftServer(thriftServerRunner.getThriftServer().listenPort);
291        break;
292      } catch (Exception ex) {
293        clientSideException = ex;
294        LOG.info("Exception", ex);
295      } finally {
296        LOG.debug("Stopping " + this.implType.simpleClassName() + " Thrift server");
297        thriftServerRunner.close();
298        thriftServerRunner.join();
299        if (thriftServerRunner.getRunException() != null) {
300          LOG.error("Command-line invocation of HBase Thrift server threw exception",
301            thriftServerRunner.getRunException());
302          throw thriftServerRunner.getRunException();
303        }
304      }
305    }
306
307    if (clientSideException != null) {
308      LOG.error("Thrift Client; parameters={}", getParametersString(), clientSideException);
309      throw new Exception(clientSideException);
310    }
311  }
312
313  protected static volatile boolean tableCreated = false;
314
315  protected void talkToThriftServer(int port) throws Exception {
316    LOG.info("Talking to port={}", port);
317    TSocket sock = new TSocket(InetAddress.getLoopbackAddress().getHostName(), port);
318    TTransport transport = sock;
319    if (specifyFramed || implType.isAlwaysFramed) {
320      transport = new TFramedTransport(transport);
321    }
322
323    sock.open();
324    try {
325      TProtocol prot;
326      if (specifyCompact) {
327        prot = new TCompactProtocol(transport);
328      } else {
329        prot = new TBinaryProtocol(transport);
330      }
331
332      Hbase.Client client = new Hbase.Client(prot);
333      if (!tableCreated){
334        TestThriftServer.createTestTables(client);
335        tableCreated = true;
336      }
337      TestThriftServer.checkTableList(client);
338
339    } finally {
340      sock.close();
341    }
342  }
343}
344