001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client.example;
019
020import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
021import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWriteAndFlush;
022
023import java.io.IOException;
024import java.net.InetSocketAddress;
025import java.util.Iterator;
026import java.util.List;
027import java.util.concurrent.ExecutionException;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseConfiguration;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.client.AsyncConnection;
032import org.apache.hadoop.hbase.client.ConnectionFactory;
033import org.apache.hadoop.hbase.client.Get;
034import org.apache.hadoop.hbase.client.Put;
035import org.apache.hadoop.hbase.ipc.NettyRpcClientConfigHelper;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.yetus.audience.InterfaceAudience;
038
039import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
040import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
041import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
042import org.apache.hbase.thirdparty.io.netty.bootstrap.ServerBootstrap;
043import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
044import org.apache.hbase.thirdparty.io.netty.channel.Channel;
045import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
046import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
047import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
048import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
049import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
050import org.apache.hbase.thirdparty.io.netty.channel.group.ChannelGroup;
051import org.apache.hbase.thirdparty.io.netty.channel.group.DefaultChannelGroup;
052import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
053import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioServerSocketChannel;
054import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
055import org.apache.hbase.thirdparty.io.netty.handler.codec.http.DefaultFullHttpResponse;
056import org.apache.hbase.thirdparty.io.netty.handler.codec.http.FullHttpRequest;
057import org.apache.hbase.thirdparty.io.netty.handler.codec.http.HttpHeaderNames;
058import org.apache.hbase.thirdparty.io.netty.handler.codec.http.HttpObjectAggregator;
059import org.apache.hbase.thirdparty.io.netty.handler.codec.http.HttpResponseStatus;
060import org.apache.hbase.thirdparty.io.netty.handler.codec.http.HttpServerCodec;
061import org.apache.hbase.thirdparty.io.netty.handler.codec.http.HttpVersion;
062import org.apache.hbase.thirdparty.io.netty.handler.codec.http.QueryStringDecoder;
063import org.apache.hbase.thirdparty.io.netty.util.concurrent.GlobalEventExecutor;
064
065/**
066 * A simple example on how to use {@link org.apache.hadoop.hbase.client.AsyncTable} to write a fully
067 * asynchronous HTTP proxy server. The {@link AsyncConnection} will share the same event loop with
068 * the HTTP server.
069 * <p>
070 * The request URL is:
071 *
072 * <pre>
073 * http://&lt;host&gt;:&lt;port&gt;/&lt;table&gt;/&lt;rowgt;/&lt;family&gt;:&lt;qualifier&gt;
074 * </pre>
075 *
076 * Use HTTP GET to fetch data, and use HTTP PUT to put data. Encode the value as the request content
077 * when doing PUT.
078 * <p>
079 * Notice that, future class methods will all return a new Future, so you always have one future
080 * that will not been checked, so we need to suppress error-prone "FutureReturnValueIgnored"
081 * warnings on the methods such as join and stop. In your real production code, you should use your
082 * own convenient way to address the warning.
083 */
084@InterfaceAudience.Private
085public class HttpProxyExample {
086
087  private final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
088
089  private final EventLoopGroup workerGroup = new NioEventLoopGroup();
090
091  private final Configuration conf;
092
093  private final int port;
094
095  private AsyncConnection conn;
096
097  private Channel serverChannel;
098
099  private ChannelGroup channelGroup;
100
101  public HttpProxyExample(Configuration conf, int port) {
102    this.conf = conf;
103    this.port = port;
104  }
105
106  private static final class Params {
107    public final String table;
108
109    public final String row;
110
111    public final String family;
112
113    public final String qualifier;
114
115    public Params(String table, String row, String family, String qualifier) {
116      this.table = table;
117      this.row = row;
118      this.family = family;
119      this.qualifier = qualifier;
120    }
121  }
122
123  private static final class RequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
124
125    private final AsyncConnection conn;
126
127    private final ChannelGroup channelGroup;
128
129    public RequestHandler(AsyncConnection conn, ChannelGroup channelGroup) {
130      this.conn = conn;
131      this.channelGroup = channelGroup;
132    }
133
134    @Override
135    public void channelActive(ChannelHandlerContext ctx) {
136      channelGroup.add(ctx.channel());
137      ctx.fireChannelActive();
138    }
139
140    @Override
141    public void channelInactive(ChannelHandlerContext ctx) {
142      channelGroup.remove(ctx.channel());
143      ctx.fireChannelInactive();
144    }
145
146    private void write(ChannelHandlerContext ctx, HttpResponseStatus status) {
147      write(ctx, status, null);
148    }
149
150    private void write(ChannelHandlerContext ctx, HttpResponseStatus status, String content) {
151      DefaultFullHttpResponse resp;
152      if (content != null) {
153        ByteBuf buf = ctx.alloc().buffer().writeBytes(Bytes.toBytes(content));
154        resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, buf);
155        resp.headers().set(HttpHeaderNames.CONTENT_LENGTH, buf.readableBytes());
156      } else {
157        resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status);
158      }
159      resp.headers().set(HttpHeaderNames.CONTENT_TYPE, "text-plain; charset=UTF-8");
160      safeWriteAndFlush(ctx, resp);
161    }
162
163    private Params parse(FullHttpRequest req) {
164      List<String> components =
165        Splitter.on('/').splitToList(new QueryStringDecoder(req.uri()).path());
166      Preconditions.checkArgument(components.size() == 4, "Unrecognized uri: %s", req.uri());
167      Iterator<String> i = components.iterator();
168      // path is start with '/' so split will give an empty component
169      i.next();
170      String table = i.next();
171      String row = i.next();
172      List<String> cfAndCq = Splitter.on(':').splitToList(i.next());
173      Preconditions.checkArgument(cfAndCq.size() == 2, "Unrecognized uri: %s", req.uri());
174      i = cfAndCq.iterator();
175      String family = i.next();
176      String qualifier = i.next();
177      return new Params(table, row, family, qualifier);
178    }
179
180    private void get(ChannelHandlerContext ctx, FullHttpRequest req) {
181      Params params = parse(req);
182      addListener(
183        conn.getTable(TableName.valueOf(params.table)).get(new Get(Bytes.toBytes(params.row))
184          .addColumn(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier))),
185        (r, e) -> {
186          if (e != null) {
187            exceptionCaught(ctx, e);
188          } else {
189            byte[] value =
190              r.getValue(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier));
191            if (value != null) {
192              write(ctx, HttpResponseStatus.OK, Bytes.toStringBinary(value));
193            } else {
194              write(ctx, HttpResponseStatus.NOT_FOUND);
195            }
196          }
197        });
198    }
199
200    private void put(ChannelHandlerContext ctx, FullHttpRequest req) {
201      Params params = parse(req);
202      byte[] value = new byte[req.content().readableBytes()];
203      req.content().readBytes(value);
204      addListener(
205        conn.getTable(TableName.valueOf(params.table)).put(new Put(Bytes.toBytes(params.row))
206          .addColumn(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier), value)),
207        (r, e) -> {
208          if (e != null) {
209            exceptionCaught(ctx, e);
210          } else {
211            write(ctx, HttpResponseStatus.OK);
212          }
213        });
214    }
215
216    @Override
217    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) {
218      switch (req.method().name()) {
219        case "GET":
220          get(ctx, req);
221          break;
222        case "PUT":
223          put(ctx, req);
224          break;
225        default:
226          write(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED);
227          break;
228      }
229    }
230
231    @Override
232    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
233      if (cause instanceof IllegalArgumentException) {
234        write(ctx, HttpResponseStatus.BAD_REQUEST, cause.getMessage());
235      } else {
236        write(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR,
237          Throwables.getStackTraceAsString(cause));
238      }
239    }
240  }
241
242  public void start() throws InterruptedException, ExecutionException {
243    NettyRpcClientConfigHelper.setEventLoopConfig(conf, workerGroup, NioSocketChannel.class);
244    conn = ConnectionFactory.createAsyncConnection(conf).get();
245    channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
246    serverChannel =
247      new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
248        .childOption(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_REUSEADDR, true)
249        .childHandler(new ChannelInitializer<Channel>() {
250
251          @Override
252          protected void initChannel(Channel ch) throws Exception {
253            ch.pipeline().addFirst(new HttpServerCodec(), new HttpObjectAggregator(4 * 1024 * 1024),
254              new RequestHandler(conn, channelGroup));
255          }
256        }).bind(port).syncUninterruptibly().channel();
257  }
258
259  @SuppressWarnings("FutureReturnValueIgnored")
260  public void join() {
261    serverChannel.closeFuture().awaitUninterruptibly();
262  }
263
264  public int port() {
265    if (serverChannel == null) {
266      return port;
267    } else {
268      return ((InetSocketAddress) serverChannel.localAddress()).getPort();
269    }
270  }
271
272  @SuppressWarnings("FutureReturnValueIgnored")
273  public void stop() throws IOException {
274    serverChannel.close().syncUninterruptibly();
275    serverChannel = null;
276    channelGroup.close().syncUninterruptibly();
277    channelGroup = null;
278    conn.close();
279    conn = null;
280  }
281
282  public static void main(String[] args) throws InterruptedException, ExecutionException {
283    int port = Integer.parseInt(args[0]);
284    HttpProxyExample proxy = new HttpProxyExample(HBaseConfiguration.create(), port);
285    proxy.start();
286    proxy.join();
287  }
288}