001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.hbase.ipc.RpcClient.SPECIFIC_WRITE_THREAD;
021import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
022import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertNotNull;
025import static org.junit.Assert.assertTrue;
026
027import java.io.IOException;
028import java.net.InetSocketAddress;
029import java.util.ArrayList;
030import java.util.HashMap;
031import java.util.List;
032import java.util.concurrent.Callable;
033import java.util.concurrent.ThreadLocalRandom;
034import java.util.concurrent.atomic.AtomicBoolean;
035import java.util.concurrent.atomic.AtomicReference;
036import java.util.concurrent.locks.ReadWriteLock;
037import java.util.concurrent.locks.ReentrantReadWriteLock;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.hbase.HBaseConfiguration;
040import org.apache.hadoop.hbase.codec.Codec;
041import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
042import org.apache.hadoop.hbase.testclassification.IntegrationTests;
043import org.apache.hadoop.hbase.util.Threads;
044import org.junit.Ignore;
045import org.junit.Test;
046import org.junit.experimental.categories.Category;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
051
052import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
053import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
054import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
055
056@Category(IntegrationTests.class)
057public class IntegrationTestRpcClient {
058
059  private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestRpcClient.class);
060
061  private final Configuration conf;
062
063  private int numIterations = 10;
064
065  public IntegrationTestRpcClient() {
066    conf = HBaseConfiguration.create();
067  }
068
069  protected AbstractRpcClient<?> createRpcClient(Configuration conf, boolean isSyncClient) {
070    return isSyncClient ? new BlockingRpcClient(conf) : new NettyRpcClient(conf) {
071      @Override
072      Codec getCodec() {
073        return null;
074      }
075    };
076  }
077
078  static String BIG_PAYLOAD;
079
080  static {
081    StringBuilder builder = new StringBuilder();
082
083    while (builder.length() < 1024 * 1024) { // 2 MB
084      builder.append("big.payload.");
085    }
086
087    BIG_PAYLOAD = builder.toString();
088  }
089
090  class Cluster {
091    ReadWriteLock lock = new ReentrantReadWriteLock();
092    HashMap<InetSocketAddress, RpcServer> rpcServers = new HashMap<>();
093    List<RpcServer> serverList = new ArrayList<>();
094    int maxServers;
095    int minServers;
096
097    Cluster(int minServers, int maxServers) {
098      this.minServers = minServers;
099      this.maxServers = maxServers;
100    }
101
102    RpcServer startServer() throws IOException {
103      lock.writeLock().lock();
104      try {
105        if (rpcServers.size() >= maxServers) {
106          return null;
107        }
108
109        RpcServer rpcServer = RpcServerFactory.createRpcServer(null, "testRpcServer",
110          Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)),
111          new InetSocketAddress("localhost", 0), conf, new FifoRpcScheduler(conf, 1));
112        rpcServer.start();
113        InetSocketAddress address = rpcServer.getListenerAddress();
114        if (address == null) {
115          throw new IOException("Listener channel is closed");
116        }
117        rpcServers.put(address, rpcServer);
118        serverList.add(rpcServer);
119        LOG.info("Started server: " + address);
120        return rpcServer;
121      } finally {
122        lock.writeLock().unlock();
123      }
124    }
125
126    void stopRandomServer() throws Exception {
127      lock.writeLock().lock();
128      RpcServer rpcServer = null;
129      try {
130        if (rpcServers.size() <= minServers) {
131          return;
132        }
133        int size = rpcServers.size();
134        int rand = ThreadLocalRandom.current().nextInt(size);
135        rpcServer = serverList.remove(rand);
136        InetSocketAddress address = rpcServer.getListenerAddress();
137        if (address == null) {
138          // Throw exception here. We can't remove this instance from the server map because
139          // we no longer have access to its map key
140          throw new IOException("Listener channel is closed");
141        }
142        rpcServers.remove(address);
143
144        if (rpcServer != null) {
145          stopServer(rpcServer);
146        }
147      } finally {
148        lock.writeLock().unlock();
149      }
150    }
151
152    void stopServer(RpcServer rpcServer) throws InterruptedException {
153      InetSocketAddress address = rpcServer.getListenerAddress();
154      LOG.info("Stopping server: " + address);
155      rpcServer.stop();
156      rpcServer.join();
157      LOG.info("Stopped server: " + address);
158    }
159
160    void stopRunning() throws InterruptedException {
161      lock.writeLock().lock();
162      try {
163        for (RpcServer rpcServer : serverList) {
164          stopServer(rpcServer);
165        }
166
167      } finally {
168        lock.writeLock().unlock();
169      }
170    }
171
172    RpcServer getRandomServer() {
173      lock.readLock().lock();
174      try {
175        int size = rpcServers.size();
176        int rand = ThreadLocalRandom.current().nextInt(size);
177        return serverList.get(rand);
178      } finally {
179        lock.readLock().unlock();
180      }
181    }
182  }
183
184  static class MiniChaosMonkey extends Thread {
185    AtomicBoolean running = new AtomicBoolean(true);
186    AtomicReference<Exception> exception = new AtomicReference<>(null);
187    Cluster cluster;
188
189    public MiniChaosMonkey(Cluster cluster) {
190      this.cluster = cluster;
191    }
192
193    @Override
194    public void run() {
195      while (running.get()) {
196        if (ThreadLocalRandom.current().nextBoolean()) {
197          // start a server
198          try {
199            cluster.startServer();
200          } catch (Exception e) {
201            LOG.warn(e.toString(), e);
202            exception.compareAndSet(null, e);
203          }
204        } else {
205          // stop a server
206          try {
207            cluster.stopRandomServer();
208          } catch (Exception e) {
209            LOG.warn(e.toString(), e);
210            exception.compareAndSet(null, e);
211          }
212        }
213
214        Threads.sleep(100);
215      }
216    }
217
218    void stopRunning() {
219      running.set(false);
220    }
221
222    void rethrowException() throws Exception {
223      if (exception.get() != null) {
224        throw exception.get();
225      }
226    }
227  }
228
229  static class SimpleClient extends Thread {
230    AbstractRpcClient<?> rpcClient;
231    AtomicBoolean running = new AtomicBoolean(true);
232    AtomicBoolean sending = new AtomicBoolean(false);
233    AtomicReference<Throwable> exception = new AtomicReference<>(null);
234    Cluster cluster;
235    String id;
236    long numCalls = 0;
237
238    public SimpleClient(Cluster cluster, AbstractRpcClient<?> rpcClient, String id) {
239      this.cluster = cluster;
240      this.rpcClient = rpcClient;
241      this.id = id;
242      this.setName(id);
243    }
244
245    @Override
246    public void run() {
247      while (running.get()) {
248        boolean isBigPayload = ThreadLocalRandom.current().nextBoolean();
249        String message = isBigPayload ? BIG_PAYLOAD : id + numCalls;
250        EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build();
251        EchoResponseProto ret;
252        RpcServer server = cluster.getRandomServer();
253        try {
254          sending.set(true);
255          BlockingInterface stub = newBlockingStub(rpcClient, server.getListenerAddress());
256          ret = stub.echo(null, param);
257        } catch (Exception e) {
258          LOG.warn(e.toString(), e);
259          continue; // expected in case connection is closing or closed
260        }
261
262        try {
263          assertNotNull(ret);
264          assertEquals(message, ret.getMessage());
265        } catch (Throwable t) {
266          exception.compareAndSet(null, t);
267        }
268
269        numCalls++;
270      }
271    }
272
273    void stopRunning() {
274      running.set(false);
275    }
276
277    boolean isSending() {
278      return sending.get();
279    }
280
281    void rethrowException() throws Throwable {
282      if (exception.get() != null) {
283        throw exception.get();
284      }
285    }
286  }
287
288  /*
289   * Test that not started connections are successfully removed from connection pool when rpc client
290   * is closing.
291   */
292  @Test
293  public void testRpcWithWriteThread() throws IOException, InterruptedException {
294    LOG.info("Starting test");
295    Cluster cluster = new Cluster(1, 1);
296    cluster.startServer();
297    conf.setBoolean(SPECIFIC_WRITE_THREAD, true);
298    for (int i = 0; i < 1000; i++) {
299      AbstractRpcClient<?> rpcClient = createRpcClient(conf, true);
300      SimpleClient client = new SimpleClient(cluster, rpcClient, "Client1");
301      client.start();
302      while (!client.isSending()) {
303        Thread.sleep(1);
304      }
305      client.stopRunning();
306      rpcClient.close();
307    }
308  }
309
310  @Test
311  public void testRpcWithChaosMonkeyWithSyncClient() throws Throwable {
312    for (int i = 0; i < numIterations; i++) {
313      TimeoutThread.runWithTimeout(new Callable<Void>() {
314        @Override
315        public Void call() throws Exception {
316          try {
317            testRpcWithChaosMonkey(true);
318          } catch (Throwable e) {
319            if (e instanceof Exception) {
320              throw (Exception) e;
321            } else {
322              throw new Exception(e);
323            }
324          }
325          return null;
326        }
327      }, 180000);
328    }
329  }
330
331  @Test
332  @Ignore // TODO: test fails with async client
333  public void testRpcWithChaosMonkeyWithAsyncClient() throws Throwable {
334    for (int i = 0; i < numIterations; i++) {
335      TimeoutThread.runWithTimeout(new Callable<Void>() {
336        @Override
337        public Void call() throws Exception {
338          try {
339            testRpcWithChaosMonkey(false);
340          } catch (Throwable e) {
341            if (e instanceof Exception) {
342              throw (Exception) e;
343            } else {
344              throw new Exception(e);
345            }
346          }
347          return null;
348        }
349      }, 90000);
350    }
351  }
352
353  static class TimeoutThread extends Thread {
354    long timeout;
355
356    public TimeoutThread(long timeout) {
357      this.timeout = timeout;
358    }
359
360    @Override
361    public void run() {
362      try {
363        Thread.sleep(timeout);
364        Threads.printThreadInfo(System.err, "TEST TIMEOUT STACK DUMP");
365        System.exit(1); // a timeout happened
366      } catch (InterruptedException e) {
367        // this is what we want
368      }
369    }
370
371    // runs in the same thread context but injects a timeout thread which will exit the JVM on
372    // timeout
373    static void runWithTimeout(Callable<?> callable, long timeout) throws Exception {
374      TimeoutThread thread = new TimeoutThread(timeout);
375      thread.start();
376      callable.call();
377      thread.interrupt();
378    }
379  }
380
381  public void testRpcWithChaosMonkey(boolean isSyncClient) throws Throwable {
382    LOG.info("Starting test");
383    Cluster cluster = new Cluster(10, 100);
384    for (int i = 0; i < 10; i++) {
385      cluster.startServer();
386    }
387
388    ArrayList<SimpleClient> clients = new ArrayList<>(30);
389
390    // all threads should share the same rpc client
391    AbstractRpcClient<?> rpcClient = createRpcClient(conf, isSyncClient);
392
393    for (int i = 0; i < 30; i++) {
394      String clientId = "client_" + i + "_";
395      LOG.info("Starting client: " + clientId);
396      SimpleClient client = new SimpleClient(cluster, rpcClient, clientId);
397      client.start();
398      clients.add(client);
399    }
400
401    LOG.info("Starting MiniChaosMonkey");
402    MiniChaosMonkey cm = new MiniChaosMonkey(cluster);
403    cm.start();
404
405    Threads.sleep(30000);
406
407    LOG.info("Stopping MiniChaosMonkey");
408    cm.stopRunning();
409    cm.join();
410    cm.rethrowException();
411
412    LOG.info("Stopping clients");
413    for (SimpleClient client : clients) {
414      LOG.info("Stopping client: " + client.id);
415      LOG.info(client.id + " numCalls:" + client.numCalls);
416      client.stopRunning();
417      client.join();
418      client.rethrowException();
419      assertTrue(client.numCalls > 10);
420    }
421
422    LOG.info("Stopping RpcClient");
423    rpcClient.close();
424
425    LOG.info("Stopping Cluster");
426    cluster.stopRunning();
427  }
428}