001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client.example;
019
020import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
021
022import java.io.IOException;
023import java.net.InetSocketAddress;
024import java.util.concurrent.ExecutionException;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.HBaseConfiguration;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.client.AsyncConnection;
029import org.apache.hadoop.hbase.client.ConnectionFactory;
030import org.apache.hadoop.hbase.client.Get;
031import org.apache.hadoop.hbase.client.Put;
032import org.apache.hadoop.hbase.ipc.NettyRpcClientConfigHelper;
033import org.apache.hadoop.hbase.util.Bytes;
034import org.apache.yetus.audience.InterfaceAudience;
035
036import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
037import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
038import org.apache.hbase.thirdparty.io.netty.bootstrap.ServerBootstrap;
039import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
040import org.apache.hbase.thirdparty.io.netty.channel.Channel;
041import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
042import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
043import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
044import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
045import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
046import org.apache.hbase.thirdparty.io.netty.channel.group.ChannelGroup;
047import org.apache.hbase.thirdparty.io.netty.channel.group.DefaultChannelGroup;
048import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
049import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioServerSocketChannel;
050import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
051import org.apache.hbase.thirdparty.io.netty.handler.codec.http.DefaultFullHttpResponse;
052import org.apache.hbase.thirdparty.io.netty.handler.codec.http.FullHttpRequest;
053import org.apache.hbase.thirdparty.io.netty.handler.codec.http.HttpHeaderNames;
054import org.apache.hbase.thirdparty.io.netty.handler.codec.http.HttpObjectAggregator;
055import org.apache.hbase.thirdparty.io.netty.handler.codec.http.HttpResponseStatus;
056import org.apache.hbase.thirdparty.io.netty.handler.codec.http.HttpServerCodec;
057import org.apache.hbase.thirdparty.io.netty.handler.codec.http.HttpVersion;
058import org.apache.hbase.thirdparty.io.netty.handler.codec.http.QueryStringDecoder;
059import org.apache.hbase.thirdparty.io.netty.util.concurrent.GlobalEventExecutor;
060
061/**
062 * A simple example on how to use {@link org.apache.hadoop.hbase.client.AsyncTable} to write a fully
063 * asynchronous HTTP proxy server. The {@link AsyncConnection} will share the same event loop with
064 * the HTTP server.
065 * <p>
066 * The request URL is:
067 *
068 * <pre>
069 * http://&lt;host&gt;:&lt;port&gt;/&lt;table&gt;/&lt;rowgt;/&lt;family&gt;:&lt;qualifier&gt;
070 * </pre>
071 *
072 * Use HTTP GET to fetch data, and use HTTP PUT to put data. Encode the value as the request content
073 * when doing PUT.
074 */
075@InterfaceAudience.Private
076public class HttpProxyExample {
077
078  private final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
079
080  private final EventLoopGroup workerGroup = new NioEventLoopGroup();
081
082  private final Configuration conf;
083
084  private final int port;
085
086  private AsyncConnection conn;
087
088  private Channel serverChannel;
089
090  private ChannelGroup channelGroup;
091
092  public HttpProxyExample(Configuration conf, int port) {
093    this.conf = conf;
094    this.port = port;
095  }
096
097  private static final class Params {
098    public final String table;
099
100    public final String row;
101
102    public final String family;
103
104    public final String qualifier;
105
106    public Params(String table, String row, String family, String qualifier) {
107      this.table = table;
108      this.row = row;
109      this.family = family;
110      this.qualifier = qualifier;
111    }
112  }
113
114  private static final class RequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
115
116    private final AsyncConnection conn;
117
118    private final ChannelGroup channelGroup;
119
120    public RequestHandler(AsyncConnection conn, ChannelGroup channelGroup) {
121      this.conn = conn;
122      this.channelGroup = channelGroup;
123    }
124
125    @Override
126    public void channelActive(ChannelHandlerContext ctx) {
127      channelGroup.add(ctx.channel());
128      ctx.fireChannelActive();
129    }
130
131    @Override
132    public void channelInactive(ChannelHandlerContext ctx) {
133      channelGroup.remove(ctx.channel());
134      ctx.fireChannelInactive();
135    }
136
137    private void write(ChannelHandlerContext ctx, HttpResponseStatus status) {
138      write(ctx, status, null);
139    }
140
141    private void write(ChannelHandlerContext ctx, HttpResponseStatus status,
142        String content) {
143      DefaultFullHttpResponse resp;
144      if (content != null) {
145        ByteBuf buf = ctx.alloc().buffer().writeBytes(Bytes.toBytes(content));
146        resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, buf);
147        resp.headers().set(HttpHeaderNames.CONTENT_LENGTH, buf.readableBytes());
148      } else {
149        resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status);
150      }
151      resp.headers().set(HttpHeaderNames.CONTENT_TYPE, "text-plain; charset=UTF-8");
152      ctx.writeAndFlush(resp);
153    }
154
155    private Params parse(FullHttpRequest req) {
156      String[] components = new QueryStringDecoder(req.uri()).path().split("/");
157      Preconditions.checkArgument(components.length == 4, "Unrecognized uri: %s", req.uri());
158      // path is start with '/' so split will give an empty component
159      String[] cfAndCq = components[3].split(":");
160      Preconditions.checkArgument(cfAndCq.length == 2, "Unrecognized uri: %s", req.uri());
161      return new Params(components[1], components[2], cfAndCq[0], cfAndCq[1]);
162    }
163
164    private void get(ChannelHandlerContext ctx, FullHttpRequest req) {
165      Params params = parse(req);
166      addListener(
167        conn.getTable(TableName.valueOf(params.table)).get(new Get(Bytes.toBytes(params.row))
168          .addColumn(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier))),
169        (r, e) -> {
170          if (e != null) {
171            exceptionCaught(ctx, e);
172          } else {
173            byte[] value =
174              r.getValue(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier));
175            if (value != null) {
176              write(ctx, HttpResponseStatus.OK, Bytes.toStringBinary(value));
177            } else {
178              write(ctx, HttpResponseStatus.NOT_FOUND);
179            }
180          }
181        });
182    }
183
184    private void put(ChannelHandlerContext ctx, FullHttpRequest req) {
185      Params params = parse(req);
186      byte[] value = new byte[req.content().readableBytes()];
187      req.content().readBytes(value);
188      addListener(
189        conn.getTable(TableName.valueOf(params.table)).put(new Put(Bytes.toBytes(params.row))
190          .addColumn(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier), value)),
191        (r, e) -> {
192          if (e != null) {
193            exceptionCaught(ctx, e);
194          } else {
195            write(ctx, HttpResponseStatus.OK);
196          }
197        });
198    }
199
200    @Override
201    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) {
202      switch (req.method().name()) {
203        case "GET":
204          get(ctx, req);
205          break;
206        case "PUT":
207          put(ctx, req);
208          break;
209        default:
210          write(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED);
211          break;
212      }
213    }
214
215    @Override
216    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
217      if (cause instanceof IllegalArgumentException) {
218        write(ctx, HttpResponseStatus.BAD_REQUEST, cause.getMessage());
219      } else {
220        write(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR,
221          Throwables.getStackTraceAsString(cause));
222      }
223    }
224  }
225
226  public void start() throws InterruptedException, ExecutionException {
227    NettyRpcClientConfigHelper.setEventLoopConfig(conf, workerGroup, NioSocketChannel.class);
228    conn = ConnectionFactory.createAsyncConnection(conf).get();
229    channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
230    serverChannel = new ServerBootstrap().group(bossGroup, workerGroup)
231        .channel(NioServerSocketChannel.class).childOption(ChannelOption.TCP_NODELAY, true)
232        .childOption(ChannelOption.SO_REUSEADDR, true)
233        .childHandler(new ChannelInitializer<Channel>() {
234
235          @Override
236          protected void initChannel(Channel ch) throws Exception {
237            ch.pipeline().addFirst(new HttpServerCodec(), new HttpObjectAggregator(4 * 1024 * 1024),
238              new RequestHandler(conn, channelGroup));
239          }
240        }).bind(port).syncUninterruptibly().channel();
241  }
242
243  public void join() {
244    serverChannel.closeFuture().awaitUninterruptibly();
245  }
246
247  public int port() {
248    if (serverChannel == null) {
249      return port;
250    } else {
251      return ((InetSocketAddress) serverChannel.localAddress()).getPort();
252    }
253  }
254
255  public void stop() throws IOException {
256    serverChannel.close().syncUninterruptibly();
257    serverChannel = null;
258    channelGroup.close().syncUninterruptibly();
259    channelGroup = null;
260    conn.close();
261    conn = null;
262  }
263
264  public static void main(String[] args) throws InterruptedException, ExecutionException {
265    int port = Integer.parseInt(args[0]);
266    HttpProxyExample proxy = new HttpProxyExample(HBaseConfiguration.create(), port);
267    proxy.start();
268    proxy.join();
269  }
270}