001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client.example; 019 020import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 021 022import java.io.IOException; 023import java.net.InetSocketAddress; 024import java.util.concurrent.ExecutionException; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.HBaseConfiguration; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.client.AsyncConnection; 029import org.apache.hadoop.hbase.client.ConnectionFactory; 030import org.apache.hadoop.hbase.client.Get; 031import org.apache.hadoop.hbase.client.Put; 032import org.apache.hadoop.hbase.ipc.NettyRpcClientConfigHelper; 033import org.apache.hadoop.hbase.util.Bytes; 034import org.apache.yetus.audience.InterfaceAudience; 035 036import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 037import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 038import org.apache.hbase.thirdparty.io.netty.bootstrap.ServerBootstrap; 039import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 040import org.apache.hbase.thirdparty.io.netty.channel.Channel; 041import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 042import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer; 043import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption; 044import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 045import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; 046import org.apache.hbase.thirdparty.io.netty.channel.group.ChannelGroup; 047import org.apache.hbase.thirdparty.io.netty.channel.group.DefaultChannelGroup; 048import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 049import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioServerSocketChannel; 050import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; 051import org.apache.hbase.thirdparty.io.netty.handler.codec.http.DefaultFullHttpResponse; 052import org.apache.hbase.thirdparty.io.netty.handler.codec.http.FullHttpRequest; 053import org.apache.hbase.thirdparty.io.netty.handler.codec.http.HttpHeaderNames; 054import org.apache.hbase.thirdparty.io.netty.handler.codec.http.HttpObjectAggregator; 055import org.apache.hbase.thirdparty.io.netty.handler.codec.http.HttpResponseStatus; 056import org.apache.hbase.thirdparty.io.netty.handler.codec.http.HttpServerCodec; 057import org.apache.hbase.thirdparty.io.netty.handler.codec.http.HttpVersion; 058import org.apache.hbase.thirdparty.io.netty.handler.codec.http.QueryStringDecoder; 059import org.apache.hbase.thirdparty.io.netty.util.concurrent.GlobalEventExecutor; 060 061/** 062 * A simple example on how to use {@link org.apache.hadoop.hbase.client.AsyncTable} to write a fully 063 * asynchronous HTTP proxy server. The {@link AsyncConnection} will share the same event loop with 064 * the HTTP server. 065 * <p> 066 * The request URL is: 067 * 068 * <pre> 069 * http://<host>:<port>/<table>/<rowgt;/<family>:<qualifier> 070 * </pre> 071 * 072 * Use HTTP GET to fetch data, and use HTTP PUT to put data. Encode the value as the request content 073 * when doing PUT. 074 */ 075@InterfaceAudience.Private 076public class HttpProxyExample { 077 078 private final EventLoopGroup bossGroup = new NioEventLoopGroup(1); 079 080 private final EventLoopGroup workerGroup = new NioEventLoopGroup(); 081 082 private final Configuration conf; 083 084 private final int port; 085 086 private AsyncConnection conn; 087 088 private Channel serverChannel; 089 090 private ChannelGroup channelGroup; 091 092 public HttpProxyExample(Configuration conf, int port) { 093 this.conf = conf; 094 this.port = port; 095 } 096 097 private static final class Params { 098 public final String table; 099 100 public final String row; 101 102 public final String family; 103 104 public final String qualifier; 105 106 public Params(String table, String row, String family, String qualifier) { 107 this.table = table; 108 this.row = row; 109 this.family = family; 110 this.qualifier = qualifier; 111 } 112 } 113 114 private static final class RequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> { 115 116 private final AsyncConnection conn; 117 118 private final ChannelGroup channelGroup; 119 120 public RequestHandler(AsyncConnection conn, ChannelGroup channelGroup) { 121 this.conn = conn; 122 this.channelGroup = channelGroup; 123 } 124 125 @Override 126 public void channelActive(ChannelHandlerContext ctx) { 127 channelGroup.add(ctx.channel()); 128 ctx.fireChannelActive(); 129 } 130 131 @Override 132 public void channelInactive(ChannelHandlerContext ctx) { 133 channelGroup.remove(ctx.channel()); 134 ctx.fireChannelInactive(); 135 } 136 137 private void write(ChannelHandlerContext ctx, HttpResponseStatus status) { 138 write(ctx, status, null); 139 } 140 141 private void write(ChannelHandlerContext ctx, HttpResponseStatus status, 142 String content) { 143 DefaultFullHttpResponse resp; 144 if (content != null) { 145 ByteBuf buf = ctx.alloc().buffer().writeBytes(Bytes.toBytes(content)); 146 resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, buf); 147 resp.headers().set(HttpHeaderNames.CONTENT_LENGTH, buf.readableBytes()); 148 } else { 149 resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status); 150 } 151 resp.headers().set(HttpHeaderNames.CONTENT_TYPE, "text-plain; charset=UTF-8"); 152 ctx.writeAndFlush(resp); 153 } 154 155 private Params parse(FullHttpRequest req) { 156 String[] components = new QueryStringDecoder(req.uri()).path().split("/"); 157 Preconditions.checkArgument(components.length == 4, "Unrecognized uri: %s", req.uri()); 158 // path is start with '/' so split will give an empty component 159 String[] cfAndCq = components[3].split(":"); 160 Preconditions.checkArgument(cfAndCq.length == 2, "Unrecognized uri: %s", req.uri()); 161 return new Params(components[1], components[2], cfAndCq[0], cfAndCq[1]); 162 } 163 164 private void get(ChannelHandlerContext ctx, FullHttpRequest req) { 165 Params params = parse(req); 166 addListener( 167 conn.getTable(TableName.valueOf(params.table)).get(new Get(Bytes.toBytes(params.row)) 168 .addColumn(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier))), 169 (r, e) -> { 170 if (e != null) { 171 exceptionCaught(ctx, e); 172 } else { 173 byte[] value = 174 r.getValue(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier)); 175 if (value != null) { 176 write(ctx, HttpResponseStatus.OK, Bytes.toStringBinary(value)); 177 } else { 178 write(ctx, HttpResponseStatus.NOT_FOUND); 179 } 180 } 181 }); 182 } 183 184 private void put(ChannelHandlerContext ctx, FullHttpRequest req) { 185 Params params = parse(req); 186 byte[] value = new byte[req.content().readableBytes()]; 187 req.content().readBytes(value); 188 addListener( 189 conn.getTable(TableName.valueOf(params.table)).put(new Put(Bytes.toBytes(params.row)) 190 .addColumn(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier), value)), 191 (r, e) -> { 192 if (e != null) { 193 exceptionCaught(ctx, e); 194 } else { 195 write(ctx, HttpResponseStatus.OK); 196 } 197 }); 198 } 199 200 @Override 201 protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) { 202 switch (req.method().name()) { 203 case "GET": 204 get(ctx, req); 205 break; 206 case "PUT": 207 put(ctx, req); 208 break; 209 default: 210 write(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED); 211 break; 212 } 213 } 214 215 @Override 216 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 217 if (cause instanceof IllegalArgumentException) { 218 write(ctx, HttpResponseStatus.BAD_REQUEST, cause.getMessage()); 219 } else { 220 write(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR, 221 Throwables.getStackTraceAsString(cause)); 222 } 223 } 224 } 225 226 public void start() throws InterruptedException, ExecutionException { 227 NettyRpcClientConfigHelper.setEventLoopConfig(conf, workerGroup, NioSocketChannel.class); 228 conn = ConnectionFactory.createAsyncConnection(conf).get(); 229 channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); 230 serverChannel = new ServerBootstrap().group(bossGroup, workerGroup) 231 .channel(NioServerSocketChannel.class).childOption(ChannelOption.TCP_NODELAY, true) 232 .childOption(ChannelOption.SO_REUSEADDR, true) 233 .childHandler(new ChannelInitializer<Channel>() { 234 235 @Override 236 protected void initChannel(Channel ch) throws Exception { 237 ch.pipeline().addFirst(new HttpServerCodec(), new HttpObjectAggregator(4 * 1024 * 1024), 238 new RequestHandler(conn, channelGroup)); 239 } 240 }).bind(port).syncUninterruptibly().channel(); 241 } 242 243 public void join() { 244 serverChannel.closeFuture().awaitUninterruptibly(); 245 } 246 247 public int port() { 248 if (serverChannel == null) { 249 return port; 250 } else { 251 return ((InetSocketAddress) serverChannel.localAddress()).getPort(); 252 } 253 } 254 255 public void stop() throws IOException { 256 serverChannel.close().syncUninterruptibly(); 257 serverChannel = null; 258 channelGroup.close().syncUninterruptibly(); 259 channelGroup = null; 260 conn.close(); 261 conn = null; 262 } 263 264 public static void main(String[] args) throws InterruptedException, ExecutionException { 265 int port = Integer.parseInt(args[0]); 266 HttpProxyExample proxy = new HttpProxyExample(HBaseConfiguration.create(), port); 267 proxy.start(); 268 proxy.join(); 269 } 270}