001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.IOException;
021import java.util.PriorityQueue;
022import java.util.Queue;
023import java.util.Set;
024import java.util.concurrent.ArrayBlockingQueue;
025import java.util.concurrent.BlockingQueue;
026import java.util.concurrent.ConcurrentSkipListSet;
027import java.util.concurrent.TimeUnit;
028import java.util.concurrent.atomic.AtomicLong;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HRegionLocation;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.Table;
033import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037/** Creates multiple threads that write key/values into the */
038public abstract class MultiThreadedWriterBase extends MultiThreadedAction {
039  private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedWriterBase.class);
040
041  /**
042   * A temporary place to keep track of inserted/updated keys. This is written to by all writers and
043   * is drained on a separate thread that populates {@link #wroteUpToKey}, the maximum key in the
044   * contiguous range of keys being inserted/updated. This queue is supposed to stay small.
045   */
046  protected BlockingQueue<Long> wroteKeys;
047
048  /**
049   * This is the current key to be inserted/updated by any thread. Each thread does an atomic get
050   * and increment operation and inserts the current value.
051   */
052  protected AtomicLong nextKeyToWrite = new AtomicLong();
053
054  /**
055   * The highest key in the contiguous range of keys .
056   */
057  protected AtomicLong wroteUpToKey = new AtomicLong();
058
059  /** The sorted set of keys NOT inserted/updated by the writers */
060  protected Set<Long> failedKeySet = new ConcurrentSkipListSet<>();
061
062  /**
063   * The total size of the temporary inserted/updated key set that have not yet lined up in a our
064   * contiguous sequence starting from startKey. Supposed to stay small.
065   */
066  protected AtomicLong wroteKeyQueueSize = new AtomicLong();
067
068  /** Enable this if used in conjunction with a concurrent reader. */
069  protected boolean trackWroteKeys;
070
071  public MultiThreadedWriterBase(LoadTestDataGenerator dataGen, Configuration conf,
072    TableName tableName, String actionLetter) throws IOException {
073    super(dataGen, conf, tableName, actionLetter);
074    this.wroteKeys = createWriteKeysQueue(conf);
075  }
076
077  protected BlockingQueue<Long> createWriteKeysQueue(Configuration conf) {
078    return new ArrayBlockingQueue<>(10000);
079  }
080
081  @Override
082  public void start(long startKey, long endKey, int numThreads) throws IOException {
083    super.start(startKey, endKey, numThreads);
084    nextKeyToWrite.set(startKey);
085    wroteUpToKey.set(startKey - 1);
086
087    if (trackWroteKeys) {
088      new Thread(new WroteKeysTracker(),
089        "MultiThreadedWriterBase-WroteKeysTracker-" + EnvironmentEdgeManager.currentTime()).start();
090      numThreadsWorking.incrementAndGet();
091    }
092  }
093
094  protected String getRegionDebugInfoSafe(Table table, byte[] rowKey) {
095    HRegionLocation cached = null, real = null;
096    try {
097      cached = connection.getRegionLocation(tableName, rowKey, false);
098      real = connection.getRegionLocation(tableName, rowKey, true);
099    } catch (Throwable t) {
100      // Cannot obtain region information for another catch block - too bad!
101    }
102    String result = "no information can be obtained";
103    if (cached != null) {
104      result = "cached: " + cached.toString();
105    }
106    if (real != null && real.getServerName() != null) {
107      if (cached != null && cached.getServerName() != null && real.equals(cached)) {
108        result += "; cache is up to date";
109      } else {
110        result = (cached != null) ? (result + "; ") : "";
111        result += "real: " + real.toString();
112      }
113    }
114    return result;
115  }
116
117  /**
118   * A thread that keeps track of the highest key in the contiguous range of inserted/updated keys.
119   */
120  private class WroteKeysTracker implements Runnable {
121
122    @Override
123    public void run() {
124      Thread.currentThread().setName(getClass().getSimpleName());
125      try {
126        long expectedKey = startKey;
127        Queue<Long> sortedKeys = new PriorityQueue<>();
128        while (expectedKey < endKey) {
129          // Block until a new element is available.
130          Long k;
131          try {
132            k = wroteKeys.poll(1, TimeUnit.SECONDS);
133          } catch (InterruptedException e) {
134            LOG.info("Inserted key tracker thread interrupted", e);
135            break;
136          }
137          if (k == null) {
138            continue;
139          }
140          if (k == expectedKey) {
141            // Skip the "sorted key" queue and consume this key.
142            wroteUpToKey.set(k);
143            ++expectedKey;
144          } else {
145            sortedKeys.add(k);
146          }
147
148          // See if we have a sequence of contiguous keys lined up.
149          while (!sortedKeys.isEmpty() && ((k = sortedKeys.peek()) == expectedKey)) {
150            sortedKeys.poll();
151            wroteUpToKey.set(k);
152            ++expectedKey;
153          }
154
155          wroteKeyQueueSize.set(wroteKeys.size() + sortedKeys.size());
156        }
157      } catch (Exception ex) {
158        LOG.error("Error in inserted/updaed key tracker", ex);
159      } finally {
160        numThreadsWorking.decrementAndGet();
161      }
162    }
163  }
164
165  public int getNumWriteFailures() {
166    return failedKeySet.size();
167  }
168
169  /**
170   * The max key until which all keys have been inserted/updated (successfully or not).
171   * @return the last key that we have inserted/updated all keys up to (inclusive)
172   */
173  public long wroteUpToKey() {
174    return wroteUpToKey.get();
175  }
176
177  public boolean failedToWriteKey(long k) {
178    return failedKeySet.contains(k);
179  }
180
181  @Override
182  protected String progressInfo() {
183    StringBuilder sb = new StringBuilder();
184    appendToStatus(sb, "wroteUpTo", wroteUpToKey.get());
185    appendToStatus(sb, "wroteQSize", wroteKeyQueueSize.get());
186    return sb.toString();
187  }
188
189  /**
190   * Used for a joint write/read workload. Enables tracking the last inserted/updated key, which
191   * requires a blocking queue and a consumer thread.
192   * @param enable whether to enable tracking the last inserted/updated key
193   */
194  public void setTrackWroteKeys(boolean enable) {
195    trackWroteKeys = enable;
196  }
197}