001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client.example; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.HashMap; 023import java.util.List; 024import java.util.Map; 025import java.util.concurrent.Callable; 026import java.util.concurrent.ExecutionException; 027import java.util.concurrent.ExecutorService; 028import java.util.concurrent.Executors; 029import java.util.concurrent.Future; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.TimeoutException; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.conf.Configured; 034import org.apache.hadoop.hbase.AuthUtil; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.BufferedMutator; 037import org.apache.hadoop.hbase.client.BufferedMutatorParams; 038import org.apache.hadoop.hbase.client.Connection; 039import org.apache.hadoop.hbase.client.ConnectionFactory; 040import org.apache.hadoop.hbase.client.Put; 041import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.apache.hadoop.util.Tool; 044import org.apache.hadoop.util.ToolRunner; 045import org.apache.yetus.audience.InterfaceAudience; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049/** 050 * An example of using the {@link BufferedMutator} interface. 051 */ 052@InterfaceAudience.Private 053public class BufferedMutatorExample extends Configured implements Tool { 054 055 private static final Logger LOG = LoggerFactory.getLogger(BufferedMutatorExample.class); 056 057 private static final int POOL_SIZE = 10; 058 private static final int TASK_COUNT = 100; 059 private static final TableName TABLE = TableName.valueOf("foo"); 060 private static final byte[] FAMILY = Bytes.toBytes("f"); 061 062 @Override 063 public int run(String[] args) throws InterruptedException, ExecutionException, TimeoutException { 064 065 /** a callback invoked when an asynchronous write fails. */ 066 final BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() { 067 @Override 068 public void onException(RetriesExhaustedWithDetailsException e, BufferedMutator mutator) { 069 for (int i = 0; i < e.getNumExceptions(); i++) { 070 LOG.info("Failed to sent put " + e.getRow(i) + "."); 071 } 072 } 073 }; 074 075 BufferedMutatorParams params = new BufferedMutatorParams(TABLE).listener(listener) 076 .setRequestAttribute("requestInfo", Bytes.toBytes("bar")); 077 078 // 079 // step 1: create a single Connection and a BufferedMutator, shared by all worker threads. 080 // 081 Map<String, byte[]> connectionAttributes = new HashMap<>(); 082 connectionAttributes.put("clientId", Bytes.toBytes("foo")); 083 Configuration conf = getConf(); 084 try ( 085 final Connection conn = ConnectionFactory.createConnection(conf, null, 086 AuthUtil.loginClient(conf), connectionAttributes); 087 final BufferedMutator mutator = conn.getBufferedMutator(params)) { 088 089 /** worker pool that operates on BufferedTable instances */ 090 final ExecutorService workerPool = Executors.newFixedThreadPool(POOL_SIZE); 091 List<Future<Void>> futures = new ArrayList<>(TASK_COUNT); 092 093 for (int i = 0; i < TASK_COUNT; i++) { 094 futures.add(workerPool.submit(new Callable<Void>() { 095 @Override 096 public Void call() throws Exception { 097 // 098 // step 2: each worker sends edits to the shared BufferedMutator instance. They all use 099 // the same backing buffer, call-back "listener", and RPC executor pool. 100 // 101 Put p = new Put(Bytes.toBytes("someRow")); 102 p.addColumn(FAMILY, Bytes.toBytes("someQualifier"), Bytes.toBytes("some value")); 103 mutator.mutate(p); 104 // do work... maybe you want to call mutator.flush() after many edits to ensure any of 105 // this worker's edits are sent before exiting the Callable 106 return null; 107 } 108 })); 109 } 110 111 // 112 // step 3: clean up the worker pool, shut down. 113 // 114 for (Future<Void> f : futures) { 115 f.get(5, TimeUnit.MINUTES); 116 } 117 workerPool.shutdown(); 118 mutator.flush(); 119 } catch (IOException e) { 120 // exception while creating/destroying Connection or BufferedMutator 121 LOG.info("exception while creating/destroying Connection or BufferedMutator", e); 122 } // BufferedMutator.close() ensures all work is flushed. Could be the custom listener is 123 // invoked from here. 124 return 0; 125 } 126 127 public static void main(String[] args) throws Exception { 128 ToolRunner.run(new BufferedMutatorExample(), args); 129 } 130}