001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.util; 020 021import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT; 022import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO; 023 024import java.io.IOException; 025import java.io.PrintWriter; 026import java.io.StringWriter; 027import java.util.Arrays; 028import java.util.HashSet; 029import java.util.Set; 030 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.Put; 035import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; 036import org.apache.hadoop.hbase.client.Table; 037import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 038import org.apache.hadoop.util.StringUtils; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042/** Creates multiple threads that write key/values into the */ 043public class MultiThreadedWriter extends MultiThreadedWriterBase { 044 private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedWriter.class); 045 046 protected Set<HBaseWriterThread> writers = new HashSet<>(); 047 048 protected boolean isMultiPut = false; 049 050 public MultiThreadedWriter(LoadTestDataGenerator dataGen, Configuration conf, 051 TableName tableName) throws IOException { 052 super(dataGen, conf, tableName, "W"); 053 } 054 055 /** Use multi-puts vs. separate puts for every column in a row */ 056 public void setMultiPut(boolean isMultiPut) { 057 this.isMultiPut = isMultiPut; 058 } 059 060 @Override 061 public void start(long startKey, long endKey, int numThreads) throws IOException { 062 super.start(startKey, endKey, numThreads); 063 064 if (verbose) { 065 LOG.debug("Inserting keys [" + startKey + ", " + endKey + ")"); 066 } 067 068 createWriterThreads(numThreads); 069 070 startThreads(writers); 071 } 072 073 protected void createWriterThreads(int numThreads) throws IOException { 074 for (int i = 0; i < numThreads; ++i) { 075 HBaseWriterThread writer = new HBaseWriterThread(i); 076 Threads.setLoggingUncaughtExceptionHandler(writer); 077 writers.add(writer); 078 } 079 } 080 081 public class HBaseWriterThread extends Thread { 082 private final Table table; 083 084 public HBaseWriterThread(int writerId) throws IOException { 085 setName(getClass().getSimpleName() + "_" + writerId); 086 table = createTable(); 087 } 088 089 protected Table createTable() throws IOException { 090 return connection.getTable(tableName); 091 } 092 093 @Override 094 public void run() { 095 try { 096 long rowKeyBase; 097 byte[][] columnFamilies = dataGenerator.getColumnFamilies(); 098 while ((rowKeyBase = nextKeyToWrite.getAndIncrement()) < endKey) { 099 byte[] rowKey = dataGenerator.getDeterministicUniqueKey(rowKeyBase); 100 Put put = new Put(rowKey); 101 numKeys.addAndGet(1); 102 int columnCount = 0; 103 for (byte[] cf : columnFamilies) { 104 byte[][] columns = dataGenerator.generateColumnsForCf(rowKey, cf); 105 for (byte[] column : columns) { 106 byte[] value = dataGenerator.generateValue(rowKey, cf, column); 107 put.addColumn(cf, column, value); 108 ++columnCount; 109 if (!isMultiPut) { 110 insert(table, put, rowKeyBase); 111 numCols.addAndGet(1); 112 put = new Put(rowKey); 113 } 114 } 115 long rowKeyHash = Arrays.hashCode(rowKey); 116 put.addColumn(cf, MUTATE_INFO, HConstants.EMPTY_BYTE_ARRAY); 117 put.addColumn(cf, INCREMENT, Bytes.toBytes(rowKeyHash)); 118 if (!isMultiPut) { 119 insert(table, put, rowKeyBase); 120 numCols.addAndGet(1); 121 put = new Put(rowKey); 122 } 123 } 124 if (isMultiPut) { 125 if (verbose) { 126 LOG.debug("Preparing put for key = [" + Bytes.toString(rowKey) + "], " + columnCount + " columns"); 127 } 128 insert(table, put, rowKeyBase); 129 numCols.addAndGet(columnCount); 130 } 131 if (trackWroteKeys) { 132 wroteKeys.add(rowKeyBase); 133 } 134 } 135 } finally { 136 closeHTable(); 137 numThreadsWorking.decrementAndGet(); 138 } 139 } 140 141 public void insert(Table table, Put put, long keyBase) { 142 long start = System.currentTimeMillis(); 143 try { 144 put = (Put) dataGenerator.beforeMutate(keyBase, put); 145 table.put(put); 146 totalOpTimeMs.addAndGet(System.currentTimeMillis() - start); 147 } catch (IOException e) { 148 failedKeySet.add(keyBase); 149 String exceptionInfo; 150 if (e instanceof RetriesExhaustedWithDetailsException) { 151 RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e; 152 exceptionInfo = aggEx.getExhaustiveDescription(); 153 } else { 154 StringWriter stackWriter = new StringWriter(); 155 PrintWriter pw = new PrintWriter(stackWriter); 156 e.printStackTrace(pw); 157 pw.flush(); 158 exceptionInfo = StringUtils.stringifyException(e); 159 } 160 LOG.error("Failed to insert: " + keyBase + " after " + (System.currentTimeMillis() - start) 161 + "ms; region information: " + getRegionDebugInfoSafe(table, put.getRow()) 162 + "; errors: " + exceptionInfo); 163 } 164 } 165 protected void closeHTable() { 166 try { 167 if (table != null) { 168 table.close(); 169 } 170 } catch (IOException e) { 171 LOG.error("Error closing table", e); 172 } 173 } 174 } 175 176 @Override 177 public void waitForFinish() { 178 super.waitForFinish(); 179 System.out.println("Failed to write keys: " + failedKeySet.size()); 180 for (Long key : failedKeySet) { 181 System.out.println("Failed to write key: " + key); 182 } 183 } 184}