001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2.store.wal; 019 020import java.util.Collection; 021import java.util.Collections; 022import java.util.HashMap; 023import java.util.Map; 024import org.apache.yetus.audience.InterfaceAudience; 025import org.slf4j.Logger; 026import org.slf4j.LoggerFactory; 027 028import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 029 030/** 031 * This class is used to track the active procedures when loading procedures from proc wal file. 032 * <p/> 033 * We will read proc wal files from new to old, but when reading a proc wal file, we will still read 034 * from top to bottom, so there are two groups of methods for this class. 035 * <p/> 036 * The first group is {@link #add(ProcedureProtos.Procedure)} and {@link #remove(long)}. It is used 037 * when reading a proc wal file. In these methods, for the same procedure, typically the one comes 038 * later should win, please see the comment for 039 * {@link #isIncreasing(ProcedureProtos.Procedure, ProcedureProtos.Procedure)} to see the 040 * exceptions. 041 * <p/> 042 * The second group is {@link #merge(WALProcedureMap)}. We will have a global 043 * {@link WALProcedureMap} to hold global the active procedures, and a local {@link WALProcedureMap} 044 * to hold the active procedures for the current proc wal file. And when we finish reading a proc 045 * wal file, we will merge the local one into the global one, by calling the 046 * {@link #merge(WALProcedureMap)} method of the global one and pass the local one in. In this 047 * method, for the same procedure, the one comes earlier will win, as we read the proc wal files 048 * from new to old(the reverse order). 049 */ 050@InterfaceAudience.Private 051class WALProcedureMap { 052 053 private static final Logger LOG = LoggerFactory.getLogger(WALProcedureMap.class); 054 055 private final Map<Long, ProcedureProtos.Procedure> procMap = new HashMap<>(); 056 057 private long minModifiedProcId = Long.MAX_VALUE; 058 059 private long maxModifiedProcId = Long.MIN_VALUE; 060 061 private void trackProcId(long procId) { 062 minModifiedProcId = Math.min(minModifiedProcId, procId); 063 maxModifiedProcId = Math.max(maxModifiedProcId, procId); 064 } 065 066 /** 067 * @return True if this new procedure is 'richer' than the current one else false and we log this 068 * incidence where it appears that the WAL has older entries appended after newer ones. 069 * See HBASE-18152. 070 */ 071 private static boolean isIncreasing(ProcedureProtos.Procedure current, 072 ProcedureProtos.Procedure candidate) { 073 // Check that the procedures we see are 'increasing'. We used to compare 074 // procedure id first and then update time but it can legitimately go backwards if the 075 // procedure is failed or rolled back so that was unreliable. Was going to compare 076 // state but lets see if comparing update time enough (unfortunately this issue only 077 // seen under load...) 078 boolean increasing = current.getLastUpdate() <= candidate.getLastUpdate(); 079 if (!increasing) { 080 LOG.warn("NOT INCREASING! current=" + current + ", candidate=" + candidate); 081 } 082 return increasing; 083 } 084 085 public void add(ProcedureProtos.Procedure proc) { 086 procMap.compute(proc.getProcId(), (procId, existingProc) -> { 087 if (existingProc == null || isIncreasing(existingProc, proc)) { 088 return proc; 089 } else { 090 return existingProc; 091 } 092 }); 093 trackProcId(proc.getProcId()); 094 } 095 096 public void remove(long procId) { 097 procMap.remove(procId); 098 } 099 100 public boolean isEmpty() { 101 return procMap.isEmpty(); 102 } 103 104 public boolean contains(long procId) { 105 return procMap.containsKey(procId); 106 } 107 108 /** 109 * Merge the given {@link WALProcedureMap} into this one. The {@link WALProcedureMap} passed in 110 * will be cleared after merging. 111 */ 112 public void merge(WALProcedureMap other) { 113 other.procMap.forEach(procMap::putIfAbsent); 114 maxModifiedProcId = Math.max(maxModifiedProcId, other.maxModifiedProcId); 115 minModifiedProcId = Math.max(minModifiedProcId, other.minModifiedProcId); 116 other.procMap.clear(); 117 other.maxModifiedProcId = Long.MIN_VALUE; 118 other.minModifiedProcId = Long.MAX_VALUE; 119 } 120 121 public Collection<ProcedureProtos.Procedure> getProcedures() { 122 return Collections.unmodifiableCollection(procMap.values()); 123 } 124 125 public long getMinModifiedProcId() { 126 return minModifiedProcId; 127 } 128 129 public long getMaxModifiedProcId() { 130 return maxModifiedProcId; 131 } 132}