001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client.example; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.concurrent.Callable; 024import java.util.concurrent.ExecutorService; 025import java.util.concurrent.Executors; 026import java.util.concurrent.ForkJoinPool; 027import java.util.concurrent.Future; 028import java.util.concurrent.ThreadFactory; 029import java.util.concurrent.ThreadLocalRandom; 030import java.util.concurrent.TimeUnit; 031import org.apache.hadoop.conf.Configured; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.Cell.Type; 034import org.apache.hadoop.hbase.CellBuilderFactory; 035import org.apache.hadoop.hbase.CellBuilderType; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.client.Connection; 038import org.apache.hadoop.hbase.client.ConnectionFactory; 039import org.apache.hadoop.hbase.client.Put; 040import org.apache.hadoop.hbase.client.RegionLocator; 041import org.apache.hadoop.hbase.client.Result; 042import org.apache.hadoop.hbase.client.ResultScanner; 043import org.apache.hadoop.hbase.client.Scan; 044import org.apache.hadoop.hbase.client.Table; 045import org.apache.hadoop.hbase.filter.KeyOnlyFilter; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.util.Tool; 048import org.apache.hadoop.util.ToolRunner; 049import org.apache.yetus.audience.InterfaceAudience; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 054 055/** 056 * Example on how to use HBase's {@link Connection} and {@link Table} in a multi-threaded 057 * environment. Each table is a light weight object that is created and thrown away. Connections are 058 * heavy weight objects that hold on to zookeeper connections, async processes, and other state. 059 * 060 * <pre> 061 * Usage: 062 * bin/hbase org.apache.hadoop.hbase.client.example.MultiThreadedClientExample testTableName 500000 063 * </pre> 064 * <p> 065 * The table should already be created before running the command. This example expects one column 066 * family named d. 067 * </p> 068 * <p> 069 * This is meant to show different operations that are likely to be done in a real world 070 * application. These operations are: 071 * </p> 072 * <ul> 073 * <li>30% of all operations performed are batch writes. 30 puts are created and sent out at a time. 074 * The response for all puts is waited on.</li> 075 * <li>20% of all operations are single writes. A single put is sent out and the response is waited 076 * for.</li> 077 * <li>50% of all operations are scans. These scans start at a random place and scan up to 100 rows. 078 * </li> 079 * </ul> 080 */ 081@InterfaceAudience.Private 082public class MultiThreadedClientExample extends Configured implements Tool { 083 private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedClientExample.class); 084 private static final int DEFAULT_NUM_OPERATIONS = 500000; 085 086 /** 087 * The name of the column family. d for default. 088 */ 089 private static final byte[] FAMILY = Bytes.toBytes("d"); 090 091 /** 092 * For the example we're just using one qualifier. 093 */ 094 private static final byte[] QUAL = Bytes.toBytes("test"); 095 096 private final ExecutorService internalPool; 097 098 private final int threads; 099 100 public MultiThreadedClientExample() throws IOException { 101 // Base number of threads. 102 // This represents the number of threads you application has 103 // that can be interacting with an hbase client. 104 this.threads = Runtime.getRuntime().availableProcessors() * 4; 105 106 // Daemon threads are great for things that get shut down. 107 ThreadFactory threadFactory = 108 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("internal-pol-%d").build(); 109 110 this.internalPool = Executors.newFixedThreadPool(threads, threadFactory); 111 } 112 113 @Override 114 public int run(String[] args) throws Exception { 115 116 if (args.length < 1 || args.length > 2) { 117 System.out.println("Usage: " + this.getClass().getName() + " tableName [num_operations]"); 118 return -1; 119 } 120 121 final TableName tableName = TableName.valueOf(args[0]); 122 int numOperations = DEFAULT_NUM_OPERATIONS; 123 124 // the second arg is the number of operations to send. 125 if (args.length == 2) { 126 numOperations = Integer.parseInt(args[1]); 127 } 128 129 // Threads for the client only. 130 // 131 // We don't want to mix hbase and business logic. 132 // 133 ExecutorService service = new ForkJoinPool(threads * 2); 134 135 // Create two different connections showing how it's possible to 136 // separate different types of requests onto different connections 137 final Connection writeConnection = ConnectionFactory.createConnection(getConf(), service); 138 final Connection readConnection = ConnectionFactory.createConnection(getConf(), service); 139 140 // At this point the entire cache for the region locations is full. 141 // Only do this if the number of regions in a table is easy to fit into memory. 142 // 143 // If you are interacting with more than 25k regions on a client then it's probably not good 144 // to do this at all. 145 warmUpConnectionCache(readConnection, tableName); 146 warmUpConnectionCache(writeConnection, tableName); 147 148 List<Future<Boolean>> futures = new ArrayList<>(numOperations); 149 for (int i = 0; i < numOperations; i++) { 150 double r = ThreadLocalRandom.current().nextDouble(); 151 Future<Boolean> f; 152 153 // For the sake of generating some synthetic load this queues 154 // some different callables. 155 // These callables are meant to represent real work done by your application. 156 if (r < .30) { 157 f = internalPool.submit(new WriteExampleCallable(writeConnection, tableName)); 158 } else if (r < .50) { 159 f = internalPool.submit(new SingleWriteExampleCallable(writeConnection, tableName)); 160 } else { 161 f = internalPool.submit(new ReadExampleCallable(writeConnection, tableName)); 162 } 163 futures.add(f); 164 } 165 166 // Wait a long time for all the reads/writes to complete 167 for (Future<Boolean> f : futures) { 168 f.get(10, TimeUnit.MINUTES); 169 } 170 171 // Clean up after our selves for cleanliness 172 internalPool.shutdownNow(); 173 service.shutdownNow(); 174 return 0; 175 } 176 177 private void warmUpConnectionCache(Connection connection, TableName tn) throws IOException { 178 try (RegionLocator locator = connection.getRegionLocator(tn)) { 179 LOG.info("Warmed up region location cache for " + tn + " got " 180 + locator.getAllRegionLocations().size()); 181 } 182 } 183 184 /** 185 * Class that will show how to send batches of puts at the same time. 186 */ 187 public static class WriteExampleCallable implements Callable<Boolean> { 188 private final Connection connection; 189 private final TableName tableName; 190 191 public WriteExampleCallable(Connection connection, TableName tableName) { 192 this.connection = connection; 193 this.tableName = tableName; 194 } 195 196 @Override 197 public Boolean call() throws Exception { 198 199 // Table implements Closable so we use the try with resource structure here. 200 // https://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html 201 try (Table t = connection.getTable(tableName)) { 202 byte[] value = Bytes.toBytes(Double.toString(ThreadLocalRandom.current().nextDouble())); 203 int rows = 30; 204 205 // Array to put the batch 206 ArrayList<Put> puts = new ArrayList<>(rows); 207 for (int i = 0; i < 30; i++) { 208 byte[] rk = Bytes.toBytes(ThreadLocalRandom.current().nextLong()); 209 Put p = new Put(rk); 210 p.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(rk).setFamily(FAMILY) 211 .setQualifier(QUAL).setTimestamp(p.getTimestamp()).setType(Cell.Type.Put) 212 .setValue(value).build()); 213 puts.add(p); 214 } 215 216 // now that we've assembled the batch it's time to push it to hbase. 217 t.put(puts); 218 } 219 return true; 220 } 221 } 222 223 /** 224 * Class to show how to send a single put. 225 */ 226 public static class SingleWriteExampleCallable implements Callable<Boolean> { 227 private final Connection connection; 228 private final TableName tableName; 229 230 public SingleWriteExampleCallable(Connection connection, TableName tableName) { 231 this.connection = connection; 232 this.tableName = tableName; 233 } 234 235 @Override 236 public Boolean call() throws Exception { 237 try (Table t = connection.getTable(tableName)) { 238 239 byte[] value = Bytes.toBytes(Double.toString(ThreadLocalRandom.current().nextDouble())); 240 byte[] rk = Bytes.toBytes(ThreadLocalRandom.current().nextLong()); 241 Put p = new Put(rk); 242 p.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(rk).setFamily(FAMILY) 243 .setQualifier(QUAL).setTimestamp(p.getTimestamp()).setType(Type.Put).setValue(value) 244 .build()); 245 t.put(p); 246 } 247 return true; 248 } 249 } 250 251 /** 252 * Class to show how to scan some rows starting at a random location. 253 */ 254 public static class ReadExampleCallable implements Callable<Boolean> { 255 private final Connection connection; 256 private final TableName tableName; 257 258 public ReadExampleCallable(Connection connection, TableName tableName) { 259 this.connection = connection; 260 this.tableName = tableName; 261 } 262 263 @Override 264 public Boolean call() throws Exception { 265 266 // total length in bytes of all read rows. 267 int result = 0; 268 269 // Number of rows the scan will read before being considered done. 270 int toRead = 100; 271 try (Table t = connection.getTable(tableName)) { 272 byte[] rk = Bytes.toBytes(ThreadLocalRandom.current().nextLong()); 273 Scan s = new Scan().withStartRow(rk); 274 275 // This filter will keep the values from being sent accross the wire. 276 // This is good for counting or other scans that are checking for 277 // existence and don't rely on the value. 278 s.setFilter(new KeyOnlyFilter()); 279 280 // Don't go back to the server for every single row. 281 // We know these rows are small. So ask for 20 at a time. 282 // This would be application specific. 283 // 284 // The goal is to reduce round trips but asking for too 285 // many rows can lead to GC problems on client and server sides. 286 s.setCaching(20); 287 288 // Don't use the cache. While this is a silly test program it's still good to be 289 // explicit that scans normally don't use the block cache. 290 s.setCacheBlocks(false); 291 292 // Open up the scanner and close it automatically when done. 293 try (ResultScanner rs = t.getScanner(s)) { 294 295 // Now go through rows. 296 for (Result r : rs) { 297 // Keep track of things size to simulate doing some real work. 298 result += r.getRow().length; 299 toRead -= 1; 300 301 // Most online applications won't be 302 // reading the entire table so this break 303 // simulates small to medium size scans, 304 // without needing to know an end row. 305 if (toRead <= 0) { 306 break; 307 } 308 } 309 } 310 } 311 return result > 0; 312 } 313 } 314 315 public static void main(String[] args) throws Exception { 316 ToolRunner.run(new MultiThreadedClientExample(), args); 317 } 318}