001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import java.io.IOException;
021import java.io.InputStream;
022import java.lang.reflect.Constructor;
023import java.lang.reflect.Modifier;
024import java.util.concurrent.ThreadLocalRandom;
025import org.apache.hadoop.hbase.HConstants;
026import org.apache.yetus.audience.InterfaceAudience;
027import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
028import org.apache.hbase.thirdparty.com.google.protobuf.Any;
029import org.apache.hbase.thirdparty.com.google.protobuf.Internal;
030import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
031import org.apache.hbase.thirdparty.com.google.protobuf.Message;
032import org.apache.hbase.thirdparty.com.google.protobuf.Parser;
033import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
034import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos;
035import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
036import org.apache.hadoop.hbase.util.NonceKey;
037
038/**
039 * Helper to convert to/from ProcedureProtos
040 */
041@InterfaceAudience.Private
042public final class ProcedureUtil {
043  private ProcedureUtil() { }
044
045  // ==========================================================================
046  //  Reflection helpers to create/validate a Procedure object
047  // ==========================================================================
048  private static Procedure<?> newProcedure(String className) throws BadProcedureException {
049    try {
050      Class<?> clazz = Class.forName(className);
051      if (!Modifier.isPublic(clazz.getModifiers())) {
052        throw new Exception("the " + clazz + " class is not public");
053      }
054
055      @SuppressWarnings("rawtypes")
056      Constructor<? extends Procedure> ctor = clazz.asSubclass(Procedure.class).getConstructor();
057      assert ctor != null : "no constructor found";
058      if (!Modifier.isPublic(ctor.getModifiers())) {
059        throw new Exception("the " + clazz + " constructor is not public");
060      }
061      return ctor.newInstance();
062    } catch (Exception e) {
063      throw new BadProcedureException(
064        "The procedure class " + className + " must be accessible and have an empty constructor",
065        e);
066    }
067  }
068
069  static void validateClass(Procedure<?> proc) throws BadProcedureException {
070    try {
071      Class<?> clazz = proc.getClass();
072      if (!Modifier.isPublic(clazz.getModifiers())) {
073        throw new Exception("the " + clazz + " class is not public");
074      }
075
076      Constructor<?> ctor = clazz.getConstructor();
077      assert ctor != null;
078      if (!Modifier.isPublic(ctor.getModifiers())) {
079        throw new Exception("the " + clazz + " constructor is not public");
080      }
081    } catch (Exception e) {
082      throw new BadProcedureException("The procedure class " + proc.getClass().getName() +
083        " must be accessible and have an empty constructor", e);
084    }
085  }
086
087  // ==========================================================================
088  //  convert to and from Procedure object
089  // ==========================================================================
090
091  /**
092   * A serializer for our Procedures. Instead of the previous serializer, it
093   * uses the stateMessage list to store the internal state of the Procedures.
094   */
095  private static class StateSerializer implements ProcedureStateSerializer {
096    private final ProcedureProtos.Procedure.Builder builder;
097    private int deserializeIndex;
098
099    public StateSerializer(ProcedureProtos.Procedure.Builder builder) {
100      this.builder = builder;
101    }
102
103    @Override
104    public void serialize(Message message) throws IOException {
105      Any packedMessage = Any.pack(message);
106      builder.addStateMessage(packedMessage);
107    }
108
109    @Override
110    public <M extends Message> M deserialize(Class<M> clazz)
111        throws IOException {
112      if (deserializeIndex >= builder.getStateMessageCount()) {
113        throw new IOException("Invalid state message index: " + deserializeIndex);
114      }
115
116      try {
117        Any packedMessage = builder.getStateMessage(deserializeIndex++);
118        return packedMessage.unpack(clazz);
119      } catch (InvalidProtocolBufferException e) {
120        throw e.unwrapIOException();
121      }
122    }
123  }
124
125  /**
126   * A serializer (deserializer) for those Procedures which were serialized
127   * before this patch. It deserializes the old, binary stateData field.
128   */
129  private static class CompatStateSerializer implements ProcedureStateSerializer {
130    private InputStream inputStream;
131
132    public CompatStateSerializer(InputStream inputStream) {
133      this.inputStream = inputStream;
134    }
135
136    @Override
137    public void serialize(Message message) throws IOException {
138      throw new UnsupportedOperationException();
139    }
140
141    @SuppressWarnings("unchecked")
142    @Override
143    public <M extends Message> M deserialize(Class<M> clazz)
144        throws IOException {
145      Parser<M> parser = (Parser<M>) Internal.getDefaultInstance(clazz).getParserForType();
146      try {
147        return parser.parseDelimitedFrom(inputStream);
148      } catch (InvalidProtocolBufferException e) {
149        throw e.unwrapIOException();
150      }
151    }
152  }
153
154  /**
155   * Helper to convert the procedure to protobuf.
156   * <p/>
157   * Used by ProcedureStore implementations.
158   */
159  public static ProcedureProtos.Procedure convertToProtoProcedure(Procedure<?> proc)
160      throws IOException {
161    Preconditions.checkArgument(proc != null);
162    validateClass(proc);
163
164    final ProcedureProtos.Procedure.Builder builder = ProcedureProtos.Procedure.newBuilder()
165      .setClassName(proc.getClass().getName())
166      .setProcId(proc.getProcId())
167      .setState(proc.getState())
168      .setSubmittedTime(proc.getSubmittedTime())
169      .setLastUpdate(proc.getLastUpdate());
170
171    if (proc.hasParent()) {
172      builder.setParentId(proc.getParentProcId());
173    }
174
175    if (proc.hasTimeout()) {
176      builder.setTimeout(proc.getTimeout());
177    }
178
179    if (proc.hasOwner()) {
180      builder.setOwner(proc.getOwner());
181    }
182
183    final int[] stackIds = proc.getStackIndexes();
184    if (stackIds != null) {
185      for (int i = 0; i < stackIds.length; ++i) {
186        builder.addStackId(stackIds[i]);
187      }
188    }
189
190    if (proc.hasException()) {
191      RemoteProcedureException exception = proc.getException();
192      builder.setException(
193        RemoteProcedureException.toProto(exception.getSource(), exception.getCause()));
194    }
195
196    final byte[] result = proc.getResult();
197    if (result != null) {
198      builder.setResult(UnsafeByteOperations.unsafeWrap(result));
199    }
200
201    ProcedureStateSerializer serializer = new StateSerializer(builder);
202    proc.serializeStateData(serializer);
203
204    if (proc.getNonceKey() != null) {
205      builder.setNonceGroup(proc.getNonceKey().getNonceGroup());
206      builder.setNonce(proc.getNonceKey().getNonce());
207    }
208
209    if (proc.hasLock()) {
210      builder.setLocked(true);
211    }
212
213    if (proc.isBypass()) {
214      builder.setBypass(true);
215    }
216    return builder.build();
217  }
218
219  /**
220   * Helper to convert the protobuf procedure.
221   * <p/>
222   * Used by ProcedureStore implementations.
223   * <p/>
224   * TODO: OPTIMIZATION: some of the field never change during the execution (e.g. className,
225   * procId, parentId, ...). We can split in 'data' and 'state', and the store may take advantage of
226   * it by storing the data only on insert().
227   */
228  public static Procedure<?> convertToProcedure(ProcedureProtos.Procedure proto)
229      throws IOException {
230    // Procedure from class name
231    Procedure<?> proc = newProcedure(proto.getClassName());
232
233    // set fields
234    proc.setProcId(proto.getProcId());
235    proc.setState(proto.getState());
236    proc.setSubmittedTime(proto.getSubmittedTime());
237    proc.setLastUpdate(proto.getLastUpdate());
238
239    if (proto.hasParentId()) {
240      proc.setParentProcId(proto.getParentId());
241    }
242
243    if (proto.hasOwner()) {
244      proc.setOwner(proto.getOwner());
245    }
246
247    if (proto.hasTimeout()) {
248      proc.setTimeout(proto.getTimeout());
249    }
250
251    if (proto.getStackIdCount() > 0) {
252      proc.setStackIndexes(proto.getStackIdList());
253    }
254
255    if (proto.hasException()) {
256      assert proc.getState() == ProcedureProtos.ProcedureState.FAILED ||
257             proc.getState() == ProcedureProtos.ProcedureState.ROLLEDBACK :
258             "The procedure must be failed (waiting to rollback) or rolledback";
259      proc.setFailure(RemoteProcedureException.fromProto(proto.getException()));
260    }
261
262    if (proto.hasResult()) {
263      proc.setResult(proto.getResult().toByteArray());
264    }
265
266    if (proto.getNonce() != HConstants.NO_NONCE) {
267      proc.setNonceKey(new NonceKey(proto.getNonceGroup(), proto.getNonce()));
268    }
269
270    if (proto.getLocked()) {
271      proc.lockedWhenLoading();
272    }
273
274    if (proto.getBypass()) {
275      proc.bypass(null);
276    }
277
278    ProcedureStateSerializer serializer = null;
279
280    if (proto.getStateMessageCount() > 0) {
281      serializer = new StateSerializer(proto.toBuilder());
282    } else if (proto.hasStateData()) {
283      InputStream inputStream = proto.getStateData().newInput();
284      serializer = new CompatStateSerializer(inputStream);
285    }
286
287    if (serializer != null) {
288      proc.deserializeStateData(serializer);
289    }
290
291    return proc;
292  }
293
294  // ==========================================================================
295  //  convert from LockedResource object
296  // ==========================================================================
297
298  public static LockServiceProtos.LockedResourceType convertToProtoResourceType(
299      LockedResourceType resourceType) {
300    return LockServiceProtos.LockedResourceType.valueOf(resourceType.name());
301  }
302
303  public static LockServiceProtos.LockType convertToProtoLockType(LockType lockType) {
304    return LockServiceProtos.LockType.valueOf(lockType.name());
305  }
306
307  public static LockServiceProtos.LockedResource convertToProtoLockedResource(
308      LockedResource lockedResource) throws IOException {
309    LockServiceProtos.LockedResource.Builder builder =
310        LockServiceProtos.LockedResource.newBuilder();
311
312    builder
313        .setResourceType(convertToProtoResourceType(lockedResource.getResourceType()))
314        .setResourceName(lockedResource.getResourceName())
315        .setLockType(convertToProtoLockType(lockedResource.getLockType()));
316
317    Procedure<?> exclusiveLockOwnerProcedure = lockedResource.getExclusiveLockOwnerProcedure();
318
319    if (exclusiveLockOwnerProcedure != null) {
320      ProcedureProtos.Procedure exclusiveLockOwnerProcedureProto =
321          convertToProtoProcedure(exclusiveLockOwnerProcedure);
322      builder.setExclusiveLockOwnerProcedure(exclusiveLockOwnerProcedureProto);
323    }
324
325    builder.setSharedLockCount(lockedResource.getSharedLockCount());
326
327    for (Procedure<?> waitingProcedure : lockedResource.getWaitingProcedures()) {
328      ProcedureProtos.Procedure waitingProcedureProto =
329          convertToProtoProcedure(waitingProcedure);
330      builder.addWaitingProcedures(waitingProcedureProto);
331    }
332
333    return builder.build();
334  }
335
336  /**
337   * Get an exponential backoff time, in milliseconds. The base unit is 1 second, and the max
338   * backoff time is 10 minutes. This is the general backoff policy for most procedure
339   * implementation.
340   */
341  public static long getBackoffTimeMs(int attempts) {
342    long maxBackoffTime = 10L * 60 * 1000; // Ten minutes, hard coded for now.
343    // avoid overflow
344    if (attempts >= 30) {
345      return maxBackoffTime;
346    }
347    long backoffTimeMs = Math.min((long) (1000 * Math.pow(2, attempts)), maxBackoffTime);
348    // 1% possible jitter
349    long jitter = (long) (backoffTimeMs * ThreadLocalRandom.current().nextFloat() * 0.01f);
350    return backoffTimeMs + jitter;
351  }
352}