001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.hbase.ipc.RpcClient.SPECIFIC_WRITE_THREAD;
021import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
022import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertNotNull;
025import static org.junit.Assert.assertTrue;
026
027import java.io.IOException;
028import java.net.InetSocketAddress;
029import java.util.ArrayList;
030import java.util.HashMap;
031import java.util.List;
032import java.util.concurrent.Callable;
033import java.util.concurrent.ThreadLocalRandom;
034import java.util.concurrent.atomic.AtomicBoolean;
035import java.util.concurrent.atomic.AtomicReference;
036import java.util.concurrent.locks.ReadWriteLock;
037import java.util.concurrent.locks.ReentrantReadWriteLock;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.hbase.HBaseConfiguration;
040import org.apache.hadoop.hbase.codec.Codec;
041import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
042import org.apache.hadoop.hbase.testclassification.IntegrationTests;
043import org.apache.hadoop.hbase.util.Threads;
044import org.junit.Ignore;
045import org.junit.Test;
046import org.junit.experimental.categories.Category;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
051
052import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
053import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
054import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
055
056@Category(IntegrationTests.class)
057public class IntegrationTestRpcClient {
058
059  private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestRpcClient.class);
060
061  private final Configuration conf;
062
063  private int numIterations = 10;
064
065  public IntegrationTestRpcClient() {
066    conf = HBaseConfiguration.create();
067  }
068
069  protected AbstractRpcClient<?> createRpcClient(Configuration conf, boolean isSyncClient) {
070    return isSyncClient ? new BlockingRpcClient(conf) : new NettyRpcClient(conf) {
071      @Override
072      protected Codec getCodec() {
073        return null;
074      }
075    };
076  }
077
078  static String BIG_PAYLOAD;
079
080  static {
081    StringBuilder builder = new StringBuilder();
082
083    while (builder.length() < 1024 * 1024) { // 2 MB
084      builder.append("big.payload.");
085    }
086
087    BIG_PAYLOAD = builder.toString();
088  }
089
090  class Cluster {
091    ReadWriteLock lock = new ReentrantReadWriteLock();
092    HashMap<InetSocketAddress, RpcServer> rpcServers = new HashMap<>();
093    List<RpcServer> serverList = new ArrayList<>();
094    int maxServers;
095    int minServers;
096
097    Cluster(int minServers, int maxServers) {
098      this.minServers = minServers;
099      this.maxServers = maxServers;
100    }
101
102    RpcServer startServer() throws IOException {
103      lock.writeLock().lock();
104      try {
105        if (rpcServers.size() >= maxServers) {
106          return null;
107        }
108
109        RpcServer rpcServer = RpcServerFactory.createRpcServer(null, "testRpcServer",
110          Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)),
111          new InetSocketAddress("localhost", 0), conf, new FifoRpcScheduler(conf, 1));
112        rpcServer.start();
113        InetSocketAddress address = rpcServer.getListenerAddress();
114        if (address == null) {
115          throw new IOException("Listener channel is closed");
116        }
117        rpcServers.put(address, rpcServer);
118        serverList.add(rpcServer);
119        LOG.info("Started server: " + address);
120        return rpcServer;
121      } finally {
122        lock.writeLock().unlock();
123      }
124    }
125
126    void stopRandomServer() throws Exception {
127      RpcServer rpcServer = null;
128      lock.writeLock().lock();
129      try {
130        if (rpcServers.size() <= minServers) {
131          return;
132        }
133        int size = rpcServers.size();
134        int rand = ThreadLocalRandom.current().nextInt(size);
135        rpcServer = serverList.remove(rand);
136        InetSocketAddress address = rpcServer.getListenerAddress();
137        if (address == null) {
138          // Throw exception here. We can't remove this instance from the server map because
139          // we no longer have access to its map key
140          throw new IOException("Listener channel is closed");
141        }
142        rpcServers.remove(address);
143
144        if (rpcServer != null) {
145          stopServer(rpcServer);
146        }
147      } finally {
148        lock.writeLock().unlock();
149      }
150    }
151
152    void stopServer(RpcServer rpcServer) throws InterruptedException {
153      InetSocketAddress address = rpcServer.getListenerAddress();
154      LOG.info("Stopping server: " + address);
155      rpcServer.stop();
156      rpcServer.join();
157      LOG.info("Stopped server: " + address);
158    }
159
160    void stopRunning() throws InterruptedException {
161      lock.writeLock().lock();
162      try {
163        for (RpcServer rpcServer : serverList) {
164          stopServer(rpcServer);
165        }
166
167      } finally {
168        lock.writeLock().unlock();
169      }
170    }
171
172    RpcServer getRandomServer() {
173      lock.readLock().lock();
174      try {
175        int size = rpcServers.size();
176        int rand = ThreadLocalRandom.current().nextInt(size);
177        return serverList.get(rand);
178      } finally {
179        lock.readLock().unlock();
180      }
181    }
182  }
183
184  static class MiniChaosMonkey extends Thread {
185    AtomicBoolean running = new AtomicBoolean(true);
186    AtomicReference<Exception> exception = new AtomicReference<>(null);
187    Cluster cluster;
188
189    public MiniChaosMonkey(Cluster cluster) {
190      this.cluster = cluster;
191    }
192
193    @Override
194    public void run() {
195      while (running.get()) {
196        if (ThreadLocalRandom.current().nextBoolean()) {
197          // start a server
198          try {
199            cluster.startServer();
200          } catch (Exception e) {
201            LOG.warn(e.toString(), e);
202            exception.compareAndSet(null, e);
203          }
204        } else {
205          // stop a server
206          try {
207            cluster.stopRandomServer();
208          } catch (Exception e) {
209            LOG.warn(e.toString(), e);
210            exception.compareAndSet(null, e);
211          }
212        }
213
214        Threads.sleep(100);
215      }
216    }
217
218    void stopRunning() {
219      running.set(false);
220    }
221
222    void rethrowException() throws Exception {
223      if (exception.get() != null) {
224        throw exception.get();
225      }
226    }
227  }
228
229  static class SimpleClient extends Thread {
230    AbstractRpcClient<?> rpcClient;
231    AtomicBoolean running = new AtomicBoolean(true);
232    AtomicBoolean sending = new AtomicBoolean(false);
233    AtomicReference<Throwable> exception = new AtomicReference<>(null);
234    Cluster cluster;
235    String id;
236    long numCalls = 0;
237
238    public SimpleClient(Cluster cluster, AbstractRpcClient<?> rpcClient, String id) {
239      this.cluster = cluster;
240      this.rpcClient = rpcClient;
241      this.id = id;
242      this.setName(id);
243    }
244
245    @Override
246    @SuppressWarnings("AssertionFailureIgnored") // intended
247    public void run() {
248      while (running.get()) {
249        boolean isBigPayload = ThreadLocalRandom.current().nextBoolean();
250        String message = isBigPayload ? BIG_PAYLOAD : id + numCalls;
251        EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build();
252        EchoResponseProto ret;
253        RpcServer server = cluster.getRandomServer();
254        try {
255          sending.set(true);
256          BlockingInterface stub = newBlockingStub(rpcClient, server.getListenerAddress());
257          ret = stub.echo(null, param);
258        } catch (Exception e) {
259          LOG.warn(e.toString(), e);
260          continue; // expected in case connection is closing or closed
261        }
262
263        try {
264          assertNotNull(ret);
265          assertEquals(message, ret.getMessage());
266        } catch (Throwable t) {
267          exception.compareAndSet(null, t);
268        }
269
270        numCalls++;
271      }
272    }
273
274    void stopRunning() {
275      running.set(false);
276    }
277
278    boolean isSending() {
279      return sending.get();
280    }
281
282    void rethrowException() throws Throwable {
283      if (exception.get() != null) {
284        throw exception.get();
285      }
286    }
287  }
288
289  /*
290   * Test that not started connections are successfully removed from connection pool when rpc client
291   * is closing.
292   */
293  @Test
294  public void testRpcWithWriteThread() throws IOException, InterruptedException {
295    LOG.info("Starting test");
296    Cluster cluster = new Cluster(1, 1);
297    cluster.startServer();
298    conf.setBoolean(SPECIFIC_WRITE_THREAD, true);
299    for (int i = 0; i < 1000; i++) {
300      AbstractRpcClient<?> rpcClient = createRpcClient(conf, true);
301      SimpleClient client = new SimpleClient(cluster, rpcClient, "Client1");
302      client.start();
303      while (!client.isSending()) {
304        Thread.sleep(1);
305      }
306      client.stopRunning();
307      rpcClient.close();
308    }
309  }
310
311  @Test
312  public void testRpcWithChaosMonkeyWithSyncClient() throws Throwable {
313    for (int i = 0; i < numIterations; i++) {
314      TimeoutThread.runWithTimeout(new Callable<Void>() {
315        @Override
316        public Void call() throws Exception {
317          try {
318            testRpcWithChaosMonkey(true);
319          } catch (Throwable e) {
320            if (e instanceof Exception) {
321              throw (Exception) e;
322            } else {
323              throw new Exception(e);
324            }
325          }
326          return null;
327        }
328      }, 180000);
329    }
330  }
331
332  @Test
333  @Ignore // TODO: test fails with async client
334  public void testRpcWithChaosMonkeyWithAsyncClient() throws Throwable {
335    for (int i = 0; i < numIterations; i++) {
336      TimeoutThread.runWithTimeout(new Callable<Void>() {
337        @Override
338        public Void call() throws Exception {
339          try {
340            testRpcWithChaosMonkey(false);
341          } catch (Throwable e) {
342            if (e instanceof Exception) {
343              throw (Exception) e;
344            } else {
345              throw new Exception(e);
346            }
347          }
348          return null;
349        }
350      }, 90000);
351    }
352  }
353
354  static class TimeoutThread extends Thread {
355    long timeout;
356
357    public TimeoutThread(long timeout) {
358      this.timeout = timeout;
359    }
360
361    @Override
362    public void run() {
363      try {
364        Thread.sleep(timeout);
365        Threads.printThreadInfo(System.err, "TEST TIMEOUT STACK DUMP");
366        System.exit(1); // a timeout happened
367      } catch (InterruptedException e) {
368        // this is what we want
369      }
370    }
371
372    // runs in the same thread context but injects a timeout thread which will exit the JVM on
373    // timeout
374    static void runWithTimeout(Callable<?> callable, long timeout) throws Exception {
375      TimeoutThread thread = new TimeoutThread(timeout);
376      thread.start();
377      callable.call();
378      thread.interrupt();
379    }
380  }
381
382  public void testRpcWithChaosMonkey(boolean isSyncClient) throws Throwable {
383    LOG.info("Starting test");
384    Cluster cluster = new Cluster(10, 100);
385    for (int i = 0; i < 10; i++) {
386      cluster.startServer();
387    }
388
389    ArrayList<SimpleClient> clients = new ArrayList<>(30);
390
391    // all threads should share the same rpc client
392    AbstractRpcClient<?> rpcClient = createRpcClient(conf, isSyncClient);
393
394    for (int i = 0; i < 30; i++) {
395      String clientId = "client_" + i + "_";
396      LOG.info("Starting client: " + clientId);
397      SimpleClient client = new SimpleClient(cluster, rpcClient, clientId);
398      client.start();
399      clients.add(client);
400    }
401
402    LOG.info("Starting MiniChaosMonkey");
403    MiniChaosMonkey cm = new MiniChaosMonkey(cluster);
404    cm.start();
405
406    Threads.sleep(30000);
407
408    LOG.info("Stopping MiniChaosMonkey");
409    cm.stopRunning();
410    cm.join();
411    cm.rethrowException();
412
413    LOG.info("Stopping clients");
414    for (SimpleClient client : clients) {
415      LOG.info("Stopping client: " + client.id);
416      LOG.info(client.id + " numCalls:" + client.numCalls);
417      client.stopRunning();
418      client.join();
419      client.rethrowException();
420      assertTrue(client.numCalls > 10);
421    }
422
423    LOG.info("Stopping RpcClient");
424    rpcClient.close();
425
426    LOG.info("Stopping Cluster");
427    cluster.stopRunning();
428  }
429}