001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.IOException; 021import java.util.PriorityQueue; 022import java.util.Queue; 023import java.util.Set; 024import java.util.concurrent.ArrayBlockingQueue; 025import java.util.concurrent.BlockingQueue; 026import java.util.concurrent.ConcurrentSkipListSet; 027import java.util.concurrent.TimeUnit; 028import java.util.concurrent.atomic.AtomicLong; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.HRegionLocation; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.RegionLocator; 033import org.apache.hadoop.hbase.client.Table; 034import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** Creates multiple threads that write key/values into the */ 039public abstract class MultiThreadedWriterBase extends MultiThreadedAction { 040 private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedWriterBase.class); 041 042 /** 043 * A temporary place to keep track of inserted/updated keys. This is written to by all writers and 044 * is drained on a separate thread that populates {@link #wroteUpToKey}, the maximum key in the 045 * contiguous range of keys being inserted/updated. This queue is supposed to stay small. 046 */ 047 protected BlockingQueue<Long> wroteKeys; 048 049 /** 050 * This is the current key to be inserted/updated by any thread. Each thread does an atomic get 051 * and increment operation and inserts the current value. 052 */ 053 protected AtomicLong nextKeyToWrite = new AtomicLong(); 054 055 /** 056 * The highest key in the contiguous range of keys . 057 */ 058 protected AtomicLong wroteUpToKey = new AtomicLong(); 059 060 /** The sorted set of keys NOT inserted/updated by the writers */ 061 protected Set<Long> failedKeySet = new ConcurrentSkipListSet<>(); 062 063 /** 064 * The total size of the temporary inserted/updated key set that have not yet lined up in a our 065 * contiguous sequence starting from startKey. Supposed to stay small. 066 */ 067 protected AtomicLong wroteKeyQueueSize = new AtomicLong(); 068 069 /** Enable this if used in conjunction with a concurrent reader. */ 070 protected boolean trackWroteKeys; 071 072 public MultiThreadedWriterBase(LoadTestDataGenerator dataGen, Configuration conf, 073 TableName tableName, String actionLetter) throws IOException { 074 super(dataGen, conf, tableName, actionLetter); 075 this.wroteKeys = createWriteKeysQueue(conf); 076 } 077 078 protected BlockingQueue<Long> createWriteKeysQueue(Configuration conf) { 079 return new ArrayBlockingQueue<>(10000); 080 } 081 082 @Override 083 public void start(long startKey, long endKey, int numThreads) throws IOException { 084 super.start(startKey, endKey, numThreads); 085 nextKeyToWrite.set(startKey); 086 wroteUpToKey.set(startKey - 1); 087 088 if (trackWroteKeys) { 089 new Thread(new WroteKeysTracker(), 090 "MultiThreadedWriterBase-WroteKeysTracker-" + EnvironmentEdgeManager.currentTime()).start(); 091 numThreadsWorking.incrementAndGet(); 092 } 093 } 094 095 protected String getRegionDebugInfoSafe(Table table, byte[] rowKey) { 096 HRegionLocation cached = null, real = null; 097 try (RegionLocator locator = connection.getRegionLocator(tableName)) { 098 cached = locator.getRegionLocation(rowKey, false); 099 real = locator.getRegionLocation(rowKey, true); 100 } catch (Throwable t) { 101 // Cannot obtain region information for another catch block - too bad! 102 } 103 String result = "no information can be obtained"; 104 if (cached != null) { 105 result = "cached: " + cached.toString(); 106 } 107 if (real != null && real.getServerName() != null) { 108 if (cached != null && cached.getServerName() != null && real.equals(cached)) { 109 result += "; cache is up to date"; 110 } else { 111 result = (cached != null) ? (result + "; ") : ""; 112 result += "real: " + real.toString(); 113 } 114 } 115 return result; 116 } 117 118 /** 119 * A thread that keeps track of the highest key in the contiguous range of inserted/updated keys. 120 */ 121 private class WroteKeysTracker implements Runnable { 122 123 @Override 124 public void run() { 125 Thread.currentThread().setName(getClass().getSimpleName()); 126 try { 127 long expectedKey = startKey; 128 Queue<Long> sortedKeys = new PriorityQueue<>(); 129 while (expectedKey < endKey) { 130 // Block until a new element is available. 131 Long k; 132 try { 133 k = wroteKeys.poll(1, TimeUnit.SECONDS); 134 } catch (InterruptedException e) { 135 LOG.info("Inserted key tracker thread interrupted", e); 136 break; 137 } 138 if (k == null) { 139 continue; 140 } 141 if (k == expectedKey) { 142 // Skip the "sorted key" queue and consume this key. 143 wroteUpToKey.set(k); 144 ++expectedKey; 145 } else { 146 sortedKeys.add(k); 147 } 148 149 // See if we have a sequence of contiguous keys lined up. 150 while (!sortedKeys.isEmpty() && ((k = sortedKeys.peek()) == expectedKey)) { 151 sortedKeys.poll(); 152 wroteUpToKey.set(k); 153 ++expectedKey; 154 } 155 156 wroteKeyQueueSize.set(wroteKeys.size() + sortedKeys.size()); 157 } 158 } catch (Exception ex) { 159 LOG.error("Error in inserted/updaed key tracker", ex); 160 } finally { 161 numThreadsWorking.decrementAndGet(); 162 } 163 } 164 } 165 166 public int getNumWriteFailures() { 167 return failedKeySet.size(); 168 } 169 170 /** 171 * The max key until which all keys have been inserted/updated (successfully or not). 172 * @return the last key that we have inserted/updated all keys up to (inclusive) 173 */ 174 public long wroteUpToKey() { 175 return wroteUpToKey.get(); 176 } 177 178 public boolean failedToWriteKey(long k) { 179 return failedKeySet.contains(k); 180 } 181 182 @Override 183 protected String progressInfo() { 184 StringBuilder sb = new StringBuilder(); 185 appendToStatus(sb, "wroteUpTo", wroteUpToKey.get()); 186 appendToStatus(sb, "wroteQSize", wroteKeyQueueSize.get()); 187 return sb.toString(); 188 } 189 190 /** 191 * Used for a joint write/read workload. Enables tracking the last inserted/updated key, which 192 * requires a blocking queue and a consumer thread. 193 * @param enable whether to enable tracking the last inserted/updated key 194 */ 195 public void setTrackWroteKeys(boolean enable) { 196 trackWroteKeys = enable; 197 } 198}