001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT; 021import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO; 022 023import java.io.IOException; 024import java.io.PrintWriter; 025import java.io.StringWriter; 026import java.util.Arrays; 027import java.util.HashSet; 028import java.util.Set; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.Put; 033import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; 034import org.apache.hadoop.hbase.client.Table; 035import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 036import org.apache.hadoop.util.StringUtils; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** Creates multiple threads that write key/values into the */ 042@InterfaceAudience.Private 043public class MultiThreadedWriter extends MultiThreadedWriterBase { 044 private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedWriter.class); 045 046 protected Set<HBaseWriterThread> writers = new HashSet<>(); 047 048 protected boolean isMultiPut = false; 049 050 public MultiThreadedWriter(LoadTestDataGenerator dataGen, Configuration conf, TableName tableName) 051 throws IOException { 052 super(dataGen, conf, tableName, "W"); 053 } 054 055 /** Use multi-puts vs. separate puts for every column in a row */ 056 public void setMultiPut(boolean isMultiPut) { 057 this.isMultiPut = isMultiPut; 058 } 059 060 @Override 061 public void start(long startKey, long endKey, int numThreads) throws IOException { 062 super.start(startKey, endKey, numThreads); 063 064 if (verbose) { 065 LOG.debug("Inserting keys [" + startKey + ", " + endKey + ")"); 066 } 067 068 createWriterThreads(numThreads); 069 070 startThreads(writers); 071 } 072 073 protected void createWriterThreads(int numThreads) throws IOException { 074 for (int i = 0; i < numThreads; ++i) { 075 HBaseWriterThread writer = new HBaseWriterThread(i); 076 Threads.setLoggingUncaughtExceptionHandler(writer); 077 writers.add(writer); 078 } 079 } 080 081 public class HBaseWriterThread extends Thread { 082 private final Table table; 083 084 public HBaseWriterThread(int writerId) throws IOException { 085 setName(getClass().getSimpleName() + "_" + writerId); 086 table = createTable(); 087 } 088 089 protected Table createTable() throws IOException { 090 return connection.getTable(tableName); 091 } 092 093 @Override 094 public void run() { 095 try { 096 long rowKeyBase; 097 byte[][] columnFamilies = dataGenerator.getColumnFamilies(); 098 while ((rowKeyBase = nextKeyToWrite.getAndIncrement()) < endKey) { 099 byte[] rowKey = dataGenerator.getDeterministicUniqueKey(rowKeyBase); 100 Put put = new Put(rowKey); 101 numKeys.addAndGet(1); 102 int columnCount = 0; 103 for (byte[] cf : columnFamilies) { 104 byte[][] columns = dataGenerator.generateColumnsForCf(rowKey, cf); 105 for (byte[] column : columns) { 106 byte[] value = dataGenerator.generateValue(rowKey, cf, column); 107 put.addColumn(cf, column, value); 108 ++columnCount; 109 if (!isMultiPut) { 110 insert(table, put, rowKeyBase); 111 numCols.addAndGet(1); 112 put = new Put(rowKey); 113 } 114 } 115 long rowKeyHash = Arrays.hashCode(rowKey); 116 put.addColumn(cf, MUTATE_INFO, HConstants.EMPTY_BYTE_ARRAY); 117 put.addColumn(cf, INCREMENT, Bytes.toBytes(rowKeyHash)); 118 if (!isMultiPut) { 119 insert(table, put, rowKeyBase); 120 numCols.addAndGet(1); 121 put = new Put(rowKey); 122 } 123 } 124 if (isMultiPut) { 125 if (verbose) { 126 LOG.debug("Preparing put for key = [" + Bytes.toString(rowKey) + "], " + columnCount 127 + " columns"); 128 } 129 insert(table, put, rowKeyBase); 130 numCols.addAndGet(columnCount); 131 } 132 if (trackWroteKeys) { 133 wroteKeys.add(rowKeyBase); 134 } 135 } 136 } finally { 137 closeHTable(); 138 numThreadsWorking.decrementAndGet(); 139 } 140 } 141 142 public void insert(Table table, Put put, long keyBase) { 143 long start = EnvironmentEdgeManager.currentTime(); 144 try { 145 put = (Put) dataGenerator.beforeMutate(keyBase, put); 146 table.put(put); 147 totalOpTimeMs.addAndGet(EnvironmentEdgeManager.currentTime() - start); 148 } catch (IOException e) { 149 failedKeySet.add(keyBase); 150 String exceptionInfo; 151 if (e instanceof RetriesExhaustedWithDetailsException) { 152 RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e; 153 exceptionInfo = aggEx.getExhaustiveDescription(); 154 } else { 155 StringWriter stackWriter = new StringWriter(); 156 PrintWriter pw = new PrintWriter(stackWriter); 157 e.printStackTrace(pw); 158 pw.flush(); 159 exceptionInfo = StringUtils.stringifyException(e); 160 } 161 LOG.error("Failed to insert: " + keyBase + " after " 162 + (EnvironmentEdgeManager.currentTime() - start) + "ms; region information: " 163 + getRegionDebugInfoSafe(table, put.getRow()) + "; errors: " + exceptionInfo); 164 } 165 } 166 167 protected void closeHTable() { 168 try { 169 if (table != null) { 170 table.close(); 171 } 172 } catch (IOException e) { 173 LOG.error("Error closing table", e); 174 } 175 } 176 } 177 178 @Override 179 public void waitForFinish() { 180 super.waitForFinish(); 181 System.out.println("Failed to write keys: " + failedKeySet.size()); 182 for (Long key : failedKeySet) { 183 System.out.println("Failed to write key: " + key); 184 } 185 } 186}