001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.client.example; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.concurrent.Callable; 025import java.util.concurrent.ExecutorService; 026import java.util.concurrent.Executors; 027import java.util.concurrent.ForkJoinPool; 028import java.util.concurrent.Future; 029import java.util.concurrent.ThreadFactory; 030import java.util.concurrent.ThreadLocalRandom; 031import java.util.concurrent.TimeUnit; 032import org.apache.hadoop.conf.Configured; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.Cell.Type; 035import org.apache.hadoop.hbase.CellBuilderFactory; 036import org.apache.hadoop.hbase.CellBuilderType; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.Connection; 039import org.apache.hadoop.hbase.client.ConnectionFactory; 040import org.apache.hadoop.hbase.client.Put; 041import org.apache.hadoop.hbase.client.RegionLocator; 042import org.apache.hadoop.hbase.client.Result; 043import org.apache.hadoop.hbase.client.ResultScanner; 044import org.apache.hadoop.hbase.client.Scan; 045import org.apache.hadoop.hbase.client.Table; 046import org.apache.hadoop.hbase.filter.KeyOnlyFilter; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.apache.hadoop.util.Tool; 049import org.apache.hadoop.util.ToolRunner; 050import org.apache.yetus.audience.InterfaceAudience; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 055 056/** 057 * Example on how to use HBase's {@link Connection} and {@link Table} in a 058 * multi-threaded environment. Each table is a light weight object 059 * that is created and thrown away. Connections are heavy weight objects 060 * that hold on to zookeeper connections, async processes, and other state. 061 * 062 * <pre> 063 * Usage: 064 * bin/hbase org.apache.hadoop.hbase.client.example.MultiThreadedClientExample testTableName 500000 065 * </pre> 066 * 067 * <p> 068 * The table should already be created before running the command. 069 * This example expects one column family named d. 070 * </p> 071 * <p> 072 * This is meant to show different operations that are likely to be 073 * done in a real world application. These operations are: 074 * </p> 075 * 076 * <ul> 077 * <li> 078 * 30% of all operations performed are batch writes. 079 * 30 puts are created and sent out at a time. 080 * The response for all puts is waited on. 081 * </li> 082 * <li> 083 * 20% of all operations are single writes. 084 * A single put is sent out and the response is waited for. 085 * </li> 086 * <li> 087 * 50% of all operations are scans. 088 * These scans start at a random place and scan up to 100 rows. 089 * </li> 090 * </ul> 091 * 092 */ 093@InterfaceAudience.Private 094public class MultiThreadedClientExample extends Configured implements Tool { 095 private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedClientExample.class); 096 private static final int DEFAULT_NUM_OPERATIONS = 500000; 097 098 /** 099 * The name of the column family. 100 * 101 * d for default. 102 */ 103 private static final byte[] FAMILY = Bytes.toBytes("d"); 104 105 /** 106 * For the example we're just using one qualifier. 107 */ 108 private static final byte[] QUAL = Bytes.toBytes("test"); 109 110 private final ExecutorService internalPool; 111 112 private final int threads; 113 114 public MultiThreadedClientExample() throws IOException { 115 // Base number of threads. 116 // This represents the number of threads you application has 117 // that can be interacting with an hbase client. 118 this.threads = Runtime.getRuntime().availableProcessors() * 4; 119 120 // Daemon threads are great for things that get shut down. 121 ThreadFactory threadFactory = new ThreadFactoryBuilder() 122 .setDaemon(true).setNameFormat("internal-pol-%d").build(); 123 124 125 this.internalPool = Executors.newFixedThreadPool(threads, threadFactory); 126 } 127 128 @Override 129 public int run(String[] args) throws Exception { 130 131 if (args.length < 1 || args.length > 2) { 132 System.out.println("Usage: " + this.getClass().getName() + " tableName [num_operations]"); 133 return -1; 134 } 135 136 final TableName tableName = TableName.valueOf(args[0]); 137 int numOperations = DEFAULT_NUM_OPERATIONS; 138 139 // the second arg is the number of operations to send. 140 if (args.length == 2) { 141 numOperations = Integer.parseInt(args[1]); 142 } 143 144 // Threads for the client only. 145 // 146 // We don't want to mix hbase and business logic. 147 // 148 ExecutorService service = new ForkJoinPool(threads * 2); 149 150 // Create two different connections showing how it's possible to 151 // separate different types of requests onto different connections 152 final Connection writeConnection = ConnectionFactory.createConnection(getConf(), service); 153 final Connection readConnection = ConnectionFactory.createConnection(getConf(), service); 154 155 // At this point the entire cache for the region locations is full. 156 // Only do this if the number of regions in a table is easy to fit into memory. 157 // 158 // If you are interacting with more than 25k regions on a client then it's probably not good 159 // to do this at all. 160 warmUpConnectionCache(readConnection, tableName); 161 warmUpConnectionCache(writeConnection, tableName); 162 163 List<Future<Boolean>> futures = new ArrayList<>(numOperations); 164 for (int i = 0; i < numOperations; i++) { 165 double r = ThreadLocalRandom.current().nextDouble(); 166 Future<Boolean> f; 167 168 // For the sake of generating some synthetic load this queues 169 // some different callables. 170 // These callables are meant to represent real work done by your application. 171 if (r < .30) { 172 f = internalPool.submit(new WriteExampleCallable(writeConnection, tableName)); 173 } else if (r < .50) { 174 f = internalPool.submit(new SingleWriteExampleCallable(writeConnection, tableName)); 175 } else { 176 f = internalPool.submit(new ReadExampleCallable(writeConnection, tableName)); 177 } 178 futures.add(f); 179 } 180 181 // Wait a long time for all the reads/writes to complete 182 for (Future<Boolean> f : futures) { 183 f.get(10, TimeUnit.MINUTES); 184 } 185 186 // Clean up after our selves for cleanliness 187 internalPool.shutdownNow(); 188 service.shutdownNow(); 189 return 0; 190 } 191 192 private void warmUpConnectionCache(Connection connection, TableName tn) throws IOException { 193 try (RegionLocator locator = connection.getRegionLocator(tn)) { 194 LOG.info( 195 "Warmed up region location cache for " + tn 196 + " got " + locator.getAllRegionLocations().size()); 197 } 198 } 199 200 /** 201 * Class that will show how to send batches of puts at the same time. 202 */ 203 public static class WriteExampleCallable implements Callable<Boolean> { 204 private final Connection connection; 205 private final TableName tableName; 206 207 public WriteExampleCallable(Connection connection, TableName tableName) { 208 this.connection = connection; 209 this.tableName = tableName; 210 } 211 212 @Override 213 public Boolean call() throws Exception { 214 215 // Table implements Closable so we use the try with resource structure here. 216 // https://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html 217 try (Table t = connection.getTable(tableName)) { 218 byte[] value = Bytes.toBytes(Double.toString(ThreadLocalRandom.current().nextDouble())); 219 int rows = 30; 220 221 // Array to put the batch 222 ArrayList<Put> puts = new ArrayList<>(rows); 223 for (int i = 0; i < 30; i++) { 224 byte[] rk = Bytes.toBytes(ThreadLocalRandom.current().nextLong()); 225 Put p = new Put(rk); 226 p.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 227 .setRow(rk) 228 .setFamily(FAMILY) 229 .setQualifier(QUAL) 230 .setTimestamp(p.getTimestamp()) 231 .setType(Cell.Type.Put) 232 .setValue(value) 233 .build()); 234 puts.add(p); 235 } 236 237 // now that we've assembled the batch it's time to push it to hbase. 238 t.put(puts); 239 } 240 return true; 241 } 242 } 243 244 /** 245 * Class to show how to send a single put. 246 */ 247 public static class SingleWriteExampleCallable implements Callable<Boolean> { 248 private final Connection connection; 249 private final TableName tableName; 250 251 public SingleWriteExampleCallable(Connection connection, TableName tableName) { 252 this.connection = connection; 253 this.tableName = tableName; 254 } 255 256 @Override 257 public Boolean call() throws Exception { 258 try (Table t = connection.getTable(tableName)) { 259 260 byte[] value = Bytes.toBytes(Double.toString(ThreadLocalRandom.current().nextDouble())); 261 byte[] rk = Bytes.toBytes(ThreadLocalRandom.current().nextLong()); 262 Put p = new Put(rk); 263 p.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 264 .setRow(rk) 265 .setFamily(FAMILY) 266 .setQualifier(QUAL) 267 .setTimestamp(p.getTimestamp()) 268 .setType(Type.Put) 269 .setValue(value) 270 .build()); 271 t.put(p); 272 } 273 return true; 274 } 275 } 276 277 278 /** 279 * Class to show how to scan some rows starting at a random location. 280 */ 281 public static class ReadExampleCallable implements Callable<Boolean> { 282 private final Connection connection; 283 private final TableName tableName; 284 285 public ReadExampleCallable(Connection connection, TableName tableName) { 286 this.connection = connection; 287 this.tableName = tableName; 288 } 289 290 @Override 291 public Boolean call() throws Exception { 292 293 // total length in bytes of all read rows. 294 int result = 0; 295 296 // Number of rows the scan will read before being considered done. 297 int toRead = 100; 298 try (Table t = connection.getTable(tableName)) { 299 byte[] rk = Bytes.toBytes(ThreadLocalRandom.current().nextLong()); 300 Scan s = new Scan(rk); 301 302 // This filter will keep the values from being sent accross the wire. 303 // This is good for counting or other scans that are checking for 304 // existence and don't rely on the value. 305 s.setFilter(new KeyOnlyFilter()); 306 307 // Don't go back to the server for every single row. 308 // We know these rows are small. So ask for 20 at a time. 309 // This would be application specific. 310 // 311 // The goal is to reduce round trips but asking for too 312 // many rows can lead to GC problems on client and server sides. 313 s.setCaching(20); 314 315 // Don't use the cache. While this is a silly test program it's still good to be 316 // explicit that scans normally don't use the block cache. 317 s.setCacheBlocks(false); 318 319 // Open up the scanner and close it automatically when done. 320 try (ResultScanner rs = t.getScanner(s)) { 321 322 // Now go through rows. 323 for (Result r : rs) { 324 // Keep track of things size to simulate doing some real work. 325 result += r.getRow().length; 326 toRead -= 1; 327 328 // Most online applications won't be 329 // reading the entire table so this break 330 // simulates small to medium size scans, 331 // without needing to know an end row. 332 if (toRead <= 0) { 333 break; 334 } 335 } 336 } 337 } 338 return result > 0; 339 } 340 } 341 342 public static void main(String[] args) throws Exception { 343 ToolRunner.run(new MultiThreadedClientExample(), args); 344 } 345}