View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client.example;
20  
21  import com.google.common.util.concurrent.ThreadFactoryBuilder;
22  import org.apache.commons.logging.Log;
23  import org.apache.commons.logging.LogFactory;
24  import org.apache.hadoop.conf.Configured;
25  import org.apache.hadoop.hbase.TableName;
26  import org.apache.hadoop.hbase.client.Connection;
27  import org.apache.hadoop.hbase.client.ConnectionFactory;
28  import org.apache.hadoop.hbase.client.Put;
29  import org.apache.hadoop.hbase.client.RegionLocator;
30  import org.apache.hadoop.hbase.client.Result;
31  import org.apache.hadoop.hbase.client.ResultScanner;
32  import org.apache.hadoop.hbase.client.Scan;
33  import org.apache.hadoop.hbase.client.Table;
34  import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
35  import org.apache.hadoop.hbase.util.Bytes;
36  import org.apache.hadoop.util.Tool;
37  import org.apache.hadoop.util.ToolRunner;
38  
39  import java.io.IOException;
40  import java.util.ArrayList;
41  import java.util.List;
42  import java.util.concurrent.Callable;
43  import java.util.concurrent.ExecutorService;
44  import java.util.concurrent.Executors;
45  import java.util.concurrent.ForkJoinPool;
46  import java.util.concurrent.Future;
47  import java.util.concurrent.ThreadFactory;
48  import java.util.concurrent.ThreadLocalRandom;
49  import java.util.concurrent.TimeUnit;
50  
51  
52  /**
53   * Example on how to use HBase's {@link Connection} and {@link Table} in a
54   * multi-threaded environment. Each table is a light weight object
55   * that is created and thrown away. Connections are heavy weight objects
56   * that hold on to zookeeper connections, async processes, and other state.
57   *
58   * <pre>
59   * Usage:
60   * bin/hbase org.apache.hadoop.hbase.client.example.MultiThreadedClientExample testTableName 500000
61   * </pre>
62   *
63   * <p>
64   * The table should already be created before running the command.
65   * This example expects one column family named d.
66   * </p>
67   * <p>
68   * This is meant to show different operations that are likely to be
69   * done in a real world application. These operations are:
70   * </p>
71   *
72   * <ul>
73   *   <li>
74   *     30% of all operations performed are batch writes.
75   *     30 puts are created and sent out at a time.
76   *     The response for all puts is waited on.
77   *   </li>
78   *   <li>
79   *     20% of all operations are single writes.
80   *     A single put is sent out and the response is waited for.
81   *   </li>
82   *   <li>
83   *     50% of all operations are scans.
84   *     These scans start at a random place and scan up to 100 rows.
85   *   </li>
86   * </ul>
87   *
88   */
89  public class MultiThreadedClientExample extends Configured implements Tool {
90    private static final Log LOG = LogFactory.getLog(MultiThreadedClientExample.class);
91    private static final int DEFAULT_NUM_OPERATIONS = 500000;
92  
93    /**
94     * The name of the column family.
95     *
96     * d for default.
97     */
98    private static final byte[] FAMILY = Bytes.toBytes("d");
99  
100   /**
101    * For the example we're just using one qualifier.
102    */
103   private static final byte[] QUAL = Bytes.toBytes("test");
104 
105   private final ExecutorService internalPool;
106 
107   private final int threads;
108 
109   public MultiThreadedClientExample() throws IOException {
110     // Base number of threads.
111     // This represents the number of threads you application has
112     // that can be interacting with an hbase client.
113     this.threads = Runtime.getRuntime().availableProcessors() * 4;
114 
115     // Daemon threads are great for things that get shut down.
116     ThreadFactory threadFactory = new ThreadFactoryBuilder()
117         .setDaemon(true).setNameFormat("internal-pol-%d").build();
118 
119 
120     this.internalPool = Executors.newFixedThreadPool(threads, threadFactory);
121   }
122 
123   @Override
124   public int run(String[] args) throws Exception {
125 
126     if (args.length < 1 || args.length > 2) {
127       System.out.println("Usage: " + this.getClass().getName() + " tableName [num_operations]");
128       return -1;
129     }
130 
131     final TableName tableName = TableName.valueOf(args[0]);
132     int numOperations = DEFAULT_NUM_OPERATIONS;
133 
134     // the second arg is the number of operations to send.
135     if (args.length == 2) {
136       numOperations = Integer.parseInt(args[1]);
137     }
138 
139     // Threads for the client only.
140     //
141     // We don't want to mix hbase and business logic.
142     //
143     ExecutorService service = new ForkJoinPool(threads * 2);
144 
145     // Create two different connections showing how it's possible to
146     // separate different types of requests onto different connections
147     final Connection writeConnection = ConnectionFactory.createConnection(getConf(), service);
148     final Connection readConnection = ConnectionFactory.createConnection(getConf(), service);
149 
150     // At this point the entire cache for the region locations is full.
151     // Only do this if the number of regions in a table is easy to fit into memory.
152     //
153     // If you are interacting with more than 25k regions on a client then it's probably not good
154     // to do this at all.
155     warmUpConnectionCache(readConnection, tableName);
156     warmUpConnectionCache(writeConnection, tableName);
157 
158     List<Future<Boolean>> futures = new ArrayList<>(numOperations);
159     for (int i = 0; i < numOperations; i++) {
160       double r = ThreadLocalRandom.current().nextDouble();
161       Future<Boolean> f;
162 
163       // For the sake of generating some synthetic load this queues
164       // some different callables.
165       // These callables are meant to represent real work done by your application.
166       if (r < .30) {
167         f = internalPool.submit(new WriteExampleCallable(writeConnection, tableName));
168       } else if (r < .50) {
169         f = internalPool.submit(new SingleWriteExampleCallable(writeConnection, tableName));
170       } else {
171         f = internalPool.submit(new ReadExampleCallable(writeConnection, tableName));
172       }
173       futures.add(f);
174     }
175 
176     // Wait a long time for all the reads/writes to complete
177     for (Future<Boolean> f : futures) {
178       f.get(10, TimeUnit.MINUTES);
179     }
180 
181     // Clean up after our selves for cleanliness
182     internalPool.shutdownNow();
183     service.shutdownNow();
184     return 0;
185   }
186 
187   private void warmUpConnectionCache(Connection connection, TableName tn) throws IOException {
188     try (RegionLocator locator = connection.getRegionLocator(tn)) {
189       LOG.info(
190           "Warmed up region location cache for " + tn
191               + " got " + locator.getAllRegionLocations().size());
192     }
193   }
194 
195   /**
196    * Class that will show how to send batches of puts at the same time.
197    */
198   public static class WriteExampleCallable implements Callable<Boolean> {
199     private final Connection connection;
200     private final TableName tableName;
201 
202     public WriteExampleCallable(Connection connection, TableName tableName) {
203       this.connection = connection;
204       this.tableName = tableName;
205     }
206 
207     @Override
208     public Boolean call() throws Exception {
209 
210       // Table implements Closable so we use the try with resource structure here.
211       // https://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html
212       try (Table t = connection.getTable(tableName)) {
213         byte[] value = Bytes.toBytes(Double.toString(ThreadLocalRandom.current().nextDouble()));
214         int rows = 30;
215 
216         // Array to put the batch
217         ArrayList<Put> puts = new ArrayList<>(rows);
218         for (int i = 0; i < 30; i++) {
219           byte[] rk = Bytes.toBytes(ThreadLocalRandom.current().nextLong());
220           Put p = new Put(rk);
221           p.addImmutable(FAMILY, QUAL, value);
222           puts.add(p);
223         }
224 
225         // now that we've assembled the batch it's time to push it to hbase.
226         t.put(puts);
227       }
228       return true;
229     }
230   }
231 
232   /**
233    * Class to show how to send a single put.
234    */
235   public static class SingleWriteExampleCallable implements Callable<Boolean> {
236     private final Connection connection;
237     private final TableName tableName;
238 
239     public SingleWriteExampleCallable(Connection connection, TableName tableName) {
240       this.connection = connection;
241       this.tableName = tableName;
242     }
243 
244     @Override
245     public Boolean call() throws Exception {
246       try (Table t = connection.getTable(tableName)) {
247 
248         byte[] value = Bytes.toBytes(Double.toString(ThreadLocalRandom.current().nextDouble()));
249         byte[] rk = Bytes.toBytes(ThreadLocalRandom.current().nextLong());
250         Put p = new Put(rk);
251         p.addImmutable(FAMILY, QUAL, value);
252         t.put(p);
253       }
254       return true;
255     }
256   }
257 
258 
259   /**
260    * Class to show how to scan some rows starting at a random location.
261    */
262   public static class ReadExampleCallable implements Callable<Boolean> {
263     private final Connection connection;
264     private final TableName tableName;
265 
266     public ReadExampleCallable(Connection connection, TableName tableName) {
267       this.connection = connection;
268       this.tableName = tableName;
269     }
270 
271     @Override
272     public Boolean call() throws Exception {
273 
274       // total length in bytes of all read rows.
275       int result = 0;
276 
277       // Number of rows the scan will read before being considered done.
278       int toRead = 100;
279       try (Table t = connection.getTable(tableName)) {
280         byte[] rk = Bytes.toBytes(ThreadLocalRandom.current().nextLong());
281         Scan s = new Scan(rk);
282 
283         // This filter will keep the values from being sent accross the wire.
284         // This is good for counting or other scans that are checking for
285         // existence and don't rely on the value.
286         s.setFilter(new KeyOnlyFilter());
287 
288         // Don't go back to the server for every single row.
289         // We know these rows are small. So ask for 20 at a time.
290         // This would be application specific.
291         //
292         // The goal is to reduce round trips but asking for too
293         // many rows can lead to GC problems on client and server sides.
294         s.setCaching(20);
295 
296         // Don't use the cache. While this is a silly test program it's still good to be
297         // explicit that scans normally don't use the block cache.
298         s.setCacheBlocks(false);
299 
300         // Open up the scanner and close it automatically when done.
301         try (ResultScanner rs = t.getScanner(s)) {
302 
303           // Now go through rows.
304           for (Result r : rs) {
305             // Keep track of things size to simulate doing some real work.
306             result += r.getRow().length;
307             toRead -= 1;
308 
309             // Most online applications won't be
310             // reading the entire table so this break
311             // simulates small to medium size scans,
312             // without needing to know an end row.
313             if (toRead <= 0)  {
314               break;
315             }
316           }
317         }
318       }
319       return result > 0;
320     }
321   }
322 
323   public static void main(String[] args) throws Exception {
324     ToolRunner.run(new MultiThreadedClientExample(), args);
325   }
326 }