001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT; 021import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO; 022 023import java.io.IOException; 024import java.io.PrintWriter; 025import java.io.StringWriter; 026import java.util.Arrays; 027import java.util.HashSet; 028import java.util.Set; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.Put; 033import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; 034import org.apache.hadoop.hbase.client.Table; 035import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 036import org.apache.hadoop.util.StringUtils; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** Creates multiple threads that write key/values into the */ 041public class MultiThreadedWriter extends MultiThreadedWriterBase { 042 private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedWriter.class); 043 044 protected Set<HBaseWriterThread> writers = new HashSet<>(); 045 046 protected boolean isMultiPut = false; 047 048 public MultiThreadedWriter(LoadTestDataGenerator dataGen, Configuration conf, TableName tableName) 049 throws IOException { 050 super(dataGen, conf, tableName, "W"); 051 } 052 053 /** Use multi-puts vs. separate puts for every column in a row */ 054 public void setMultiPut(boolean isMultiPut) { 055 this.isMultiPut = isMultiPut; 056 } 057 058 @Override 059 public void start(long startKey, long endKey, int numThreads) throws IOException { 060 super.start(startKey, endKey, numThreads); 061 062 if (verbose) { 063 LOG.debug("Inserting keys [" + startKey + ", " + endKey + ")"); 064 } 065 066 createWriterThreads(numThreads); 067 068 startThreads(writers); 069 } 070 071 protected void createWriterThreads(int numThreads) throws IOException { 072 for (int i = 0; i < numThreads; ++i) { 073 HBaseWriterThread writer = new HBaseWriterThread(i); 074 Threads.setLoggingUncaughtExceptionHandler(writer); 075 writers.add(writer); 076 } 077 } 078 079 public class HBaseWriterThread extends Thread { 080 private final Table table; 081 082 public HBaseWriterThread(int writerId) throws IOException { 083 setName(getClass().getSimpleName() + "_" + writerId); 084 table = createTable(); 085 } 086 087 protected Table createTable() throws IOException { 088 return connection.getTable(tableName); 089 } 090 091 @Override 092 public void run() { 093 try { 094 long rowKeyBase; 095 byte[][] columnFamilies = dataGenerator.getColumnFamilies(); 096 while ((rowKeyBase = nextKeyToWrite.getAndIncrement()) < endKey) { 097 byte[] rowKey = dataGenerator.getDeterministicUniqueKey(rowKeyBase); 098 Put put = new Put(rowKey); 099 numKeys.addAndGet(1); 100 int columnCount = 0; 101 for (byte[] cf : columnFamilies) { 102 byte[][] columns = dataGenerator.generateColumnsForCf(rowKey, cf); 103 for (byte[] column : columns) { 104 byte[] value = dataGenerator.generateValue(rowKey, cf, column); 105 put.addColumn(cf, column, value); 106 ++columnCount; 107 if (!isMultiPut) { 108 insert(table, put, rowKeyBase); 109 numCols.addAndGet(1); 110 put = new Put(rowKey); 111 } 112 } 113 long rowKeyHash = Arrays.hashCode(rowKey); 114 put.addColumn(cf, MUTATE_INFO, HConstants.EMPTY_BYTE_ARRAY); 115 put.addColumn(cf, INCREMENT, Bytes.toBytes(rowKeyHash)); 116 if (!isMultiPut) { 117 insert(table, put, rowKeyBase); 118 numCols.addAndGet(1); 119 put = new Put(rowKey); 120 } 121 } 122 if (isMultiPut) { 123 if (verbose) { 124 LOG.debug("Preparing put for key = [" + Bytes.toString(rowKey) + "], " + columnCount 125 + " columns"); 126 } 127 insert(table, put, rowKeyBase); 128 numCols.addAndGet(columnCount); 129 } 130 if (trackWroteKeys) { 131 wroteKeys.add(rowKeyBase); 132 } 133 } 134 } finally { 135 closeHTable(); 136 numThreadsWorking.decrementAndGet(); 137 } 138 } 139 140 public void insert(Table table, Put put, long keyBase) { 141 long start = EnvironmentEdgeManager.currentTime(); 142 try { 143 put = (Put) dataGenerator.beforeMutate(keyBase, put); 144 table.put(put); 145 totalOpTimeMs.addAndGet(EnvironmentEdgeManager.currentTime() - start); 146 } catch (IOException e) { 147 failedKeySet.add(keyBase); 148 String exceptionInfo; 149 if (e instanceof RetriesExhaustedWithDetailsException) { 150 RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e; 151 exceptionInfo = aggEx.getExhaustiveDescription(); 152 } else { 153 StringWriter stackWriter = new StringWriter(); 154 PrintWriter pw = new PrintWriter(stackWriter); 155 e.printStackTrace(pw); 156 pw.flush(); 157 exceptionInfo = StringUtils.stringifyException(e); 158 } 159 LOG.error("Failed to insert: " + keyBase + " after " 160 + (EnvironmentEdgeManager.currentTime() - start) + "ms; region information: " 161 + getRegionDebugInfoSafe(table, put.getRow()) + "; errors: " + exceptionInfo); 162 } 163 } 164 165 protected void closeHTable() { 166 try { 167 if (table != null) { 168 table.close(); 169 } 170 } catch (IOException e) { 171 LOG.error("Error closing table", e); 172 } 173 } 174 } 175 176 @Override 177 public void waitForFinish() { 178 super.waitForFinish(); 179 System.out.println("Failed to write keys: " + failedKeySet.size()); 180 for (Long key : failedKeySet) { 181 System.out.println("Failed to write key: " + key); 182 } 183 } 184}