该程序(optoesim)在java.util.ConcurrentModificationException
的行上抛出stats.put("jobTimesWithQueue", new LinkedHashMap(_jobTimesWithQueue));
。
这是一个开源程序,我什么也没做。有人可以告诉我这是什么意思,什么原因以及如何避免它。
Exception in thread ConcurrentModificationException
java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:711)**
at java.util.LinkedHashMap$LinkedEntryIterator.next(LinkedHashMap.java:744)
at java.util.LinkedHashMap$LinkedEntryIterator.next(LinkedHashMap.java:742)
at java.util.HashMap.putMapEntries(HashMap.java:511)
at java.util.LinkedHashMap.<init>(LinkedHashMap.java:384)
at org.edg.data.replication.optorsim.SimpleComputingElement.getStatistics(SimpleComputingElement.java:164)
at org.edg.data.replication.optorsim.GridDataThread.run(GridDataThread.java:95)
public Statistics getStatistics() {
Map stats = new HashMap();
// After remove see the result here.
OptorSimParameters params = OptorSimParameters.getInstance();
float _usage = _time.getTimeMillis() - _startRunning == 0 ? 0 : 100 *_workingTime/(_time.getTimeMillis() - _startRunning);
stats.put("usage", new Float(_usage));
stats.put("remoteReads", new Long(_remoteReads));
stats.put("localReads", new Long(_localReads));
if( params.outputStatistics() ==3) {
stats.put("jobTimes", new LinkedHashMap( _jobTimes));
stats.put("jobTimesWithQueue", new LinkedHashMap(_jobTimesWithQueue));
stats.put("jobFiles", new LinkedHashMap(_jobFiles));
stats.put("numberOfJobs", new Integer(_jobsCompleted));
stats.put("workerNodes", new Integer(_workerNodes));
stats.put("status", new Boolean(_active));
stats.put("queueLength", new Integer(_inputJobHandler.getQueueSize()));
stats.put("runnableStatus", new Boolean(_runnable));
}
stats.put("totalJobTime", new Float(_totalJobTime/(float)1000));
long meanJobTime = 0;
if (_jobsCompleted!=0)
meanJobTime = _workingTime/_jobsCompleted;
/////////////////////////////////////////
stats.put("meanJobTime", new Long(meanJobTime));
return new Statistics(this, stats);
}
**Edit:**
我想向您展示erreur的所有细节
Exception in thread "Thread-72" java.util.ConcurrentModificationException
at java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:711)
at java.util.LinkedHashMap$LinkedEntryIterator.next(LinkedHashMap.java:744)
at java.util.LinkedHashMap$LinkedEntryIterator.next(LinkedHashMap.java:742)
at java.util.HashMap.putMapEntries(HashMap.java:511)
at java.util.LinkedHashMap.<init>(LinkedHashMap.java:384)
at org.edg.data.replication.optorsim.SimpleComputingElement.getStatistics(SimpleComputingElement.java:174)
at org.edg.data.replication.optorsim.SiteDataThread.run(SiteDataThread.java:112)
当我单击第一个linkedhashMap时
final LinkedHashMap.Entry<K,V> nextNode() {
LinkedHashMap.Entry<K,V> e = next;
if (modCount != expectedModCount)
throw new ConcurrentModificationException();
if (e == null)
throw new NoSuchElementException();
current = e;
next = e.after;
return e;
}
第二个链接的hasMap
final class LinkedEntryIterator extends LinkedHashIterator
implements Iterator<Map.Entry<K,V>> {
public final Map.Entry<K,V> next() { return nextNode(); }
}
而错误是在第三行
第三rdlinkhashMap
final class LinkedEntryIterator extends LinkedHashIterator
implements Iterator<Map.Entry<K,V>> {
public final Map.Entry<K,V> next() { return nextNode(); }
}
erreur在第一行
哈希图
for (Map.Entry<? extends K, ? extends V> e : m.entrySet()) {
K key = e.getKey();
V value = e.getValue();
putVal(hash(key), key, value, false, evict);
linkedhashMap
public LinkedHashMap(Map<? extends K, ? extends V> m) {
super();
accessOrder = false;
putMapEntries(m, false);
}
erreur在排队
putMapEntries(m, false);
at org.edg.data.replication.optorsim.SimpleComputingElement.getStatistics(SimpleComputingElement.java:174)
这就是simpleComputing Elelement的所有代码
package org.edg.data.replication.optorsim;
/
* ComputingElement运行一个执行GridJobs的线程
*通过其{@link JobHandler}给予。对于每个所需文件,
* ComputingElement调用getBestFile(),该方法返回
*根据文件的最佳副本的位置
*选择的优化算法,可能已执行或未执行
*复制。 ComputingElement从此位置读取文件
*并对其进行处理。处理文件的时间计算为
*参数文件中指定的时间除以工作人数
* ComputingElement中的节点。
*
*每个ComputingElement当前一次只能运行一个作业。
*有关每个工作所用时间的信息可以在
*如果统计级别为3,则在模拟结束时输出统计信息
*在参数文件中或从工作时间直方图中选择
*如果使用GUI。
*
*(c)2002 CERN,ITC-irst,PPARC,代表欧盟DataGrid。
*有关许可条件,请参阅许可文件或
* http://www.edg.org/license.html
*
* @自JDK1.4起
/ *
public class SimpleComputingElement implements ComputingElement {
private static int _LastCEId = 0;
private GridSite _site;
private String _ceName;
private boolean _imAlive;
private boolean _paused = false;
private int _CEId;
private long _workingTime = 0;
private long _startRunning;
private long _totalJobTime = 0;
private Map _jobTimes = new LinkedHashMap();
private Map _jobTimesWithQueue = new LinkedHashMap();
private Map _jobFiles = new LinkedHashMap();
private int _jobsCompleted = 0;
protected JobHandler _inputJobHandler;
protected boolean _runnable = false;
protected boolean _active=false;
protected long _remoteReads = 0;
protected long _localReads = 0;
protected int _workerNodes = 0;
protected float _workerCapacity = 0;
protected GridTime _time;
public SimpleComputingElement( GridSite site, int workerNodes, float capacity) {
OptorSimParameters params = OptorSimParameters.getInstance();
_time = GridTimeFactory.getGridTime();
_site = site;
_workerNodes = workerNodes;
_workerCapacity = capacity;
_CEId = ++_LastCEId;
_ceName = "CE"+_CEId+"@"+_site;
_inputJobHandler = new JobHandler( params.getMaxQueueSize());
_imAlive = true;
_site.registerCE( this);
_startRunning = _time.getTimeMillis();
}
/**
* Return a more meaningful name.
* @return the CE's name
*/
public String toString() {
return _ceName;
}
/**
* Check whether this CE is active (processing jobs) or idle.
*/
public boolean active() {
return _active;
}
/**
* Check whether this CE is still running or has been shut down.
*/
public boolean imAlive() {
return _imAlive;
}
/**
* A method to return the input sandbox for this computing element.
*/
public JobHandler getJobHandler() {
return _inputJobHandler;
}
/**
* Method to get the site that this CE is on.
* @return The site this CE is on.
*/
public GridSite getSite() {
return _site;
}
/**
* Method to give the name of this CE.
* @return The name of this CE.
*/
public String getCeName() {
return _ceName;
}
public int getWorkerNodes() {
return _workerNodes;
}
/**
* Method to check against our ID
*/
public boolean iAm( int id) {
return _CEId == id;
}
/**
* Method to collate and return information relevant
* to this CE as a {@link Statistics} object.
* @return The statistics of this CE
*/
public Statistics getStatistics() {
Map stats = new HashMap();
OptorSimParameters params = OptorSimParameters.getInstance();
float _usage = _time.getTimeMillis() - _startRunning == 0 ? 0 : 100 *_workingTime/(_time.getTimeMillis() - _startRunning);
stats.put("usage", new Float(_usage));
stats.put("remoteReads", new Long(_remoteReads));
stats.put("localReads", new Long(_localReads));
if( params.outputStatistics() ==3) {
/* LinkedHashSet<String> lhs = new LinkedHashSet<String>();
stats.put("jobTimes", new LinkedHashMap( _jobTimes));
stats.put("jobTimesWithQueue", new LinkedHashMap(_jobTimesWithQueue));
stats.put("jobFiles", new LinkedHashMap(_jobFiles));
stats.put("numberOfJobs", new Integer(_jobsCompleted));
stats.put("workerNodes", new Integer(_workerNodes));
stats.put("status", new Boolean(_active));
stats.put("queueLength", new Integer(_inputJobHandler.getQueueSize()));
stats.put("runnableStatus", new Boolean(_runnable));
}
stats.put("totalJobTime", new Float(_totalJobTime/(float)1000));
long meanJobTime = 0;
if (_jobsCompleted!=0)
meanJobTime = _workingTime/_jobsCompleted;
/////////////////////////////////////////
stats.put("meanJobTime", new Long(meanJobTime));
return new Statistics(this, stats);
}
/**
* When running, the ComputingElement processes all the jobs
* submitted to it through the JobHandler, sleeping while the
* JobHandler is empty. It is notified to shut down by the
* ResourceBroker.
*/
public void run() {
// Boost our priority
Thread.currentThread().setPriority(Thread.MAX_PRIORITY);
Double execTime;
OptorSimParameters params = OptorSimParameters.getInstance();
_runnable = true;
// to keep thread running
for( GridJob job=null; job != null || _imAlive; ) {
_active=false;
job=_inputJobHandler.get(); // This potentially blocks
// We might get a null job from JobHandler, if so, skip any further activity
if( job == null)
continue;
job.started();
OptorSimOut.println(_ceName+"> starting to process "+job+" (queue length now "+
_inputJobHandler.getQueueSize()+")");
_active=true;
// Install our optimiser
Optimisable replicaOptimiser = OptimiserFactory.getOptimisable( _site);
AccessPatternGenerator accessPatternGenerator
= AccessPatternGeneratorFactory.getAPGenerator(job);
String[] logicalfilenames = new String[1];
List filesAccessed = new LinkedList();
for( String lfn = accessPatternGenerator.getNextFile();
lfn != null;
lfn = accessPatternGenerator.getNextFile()) {
filesAccessed.add(lfn);
// Pack the logical file name into the expected structure:
logicalfilenames[0] = lfn;
float[] fileFractions = new float[1];
fileFractions[0] = (float)1.0;
// Use optimiser to locate best replica of this file
DataFile[] files = replicaOptimiser.getBestFile(logicalfilenames,
fileFractions);
if( files.length != 1) {
System.out.println( "ASSERT FAILED: CE, getBestFile return array with wrong number of entries: "+ files.length +" != 1");
continue; // skip to next file
}
if(files[0] == null) {
System.out.println( _ceName + "> ERROR getBestFile returned"+
" null for "+logicalfilenames[0]);
continue; // skip to next file
}
StorageElement fileSE = files[0].se();
GridSite fileSite = fileSE.getGridSite();
// Special case. If file is remote, then simulate the remoteIO, unPin and move on to next file.
if( _site != fileSite) {
simulateRemoteIO( files[0], fileFractions[0]);
// log this as an access on the close SE (if it exists!)
if(_site.hasSEs())
_site.getCloseSE().accessFile(files[0]);
if(_workerNodes != 0) {
execTime = new Double((job.getLatency() + job.getLinearFactor()*files[0].size())/(_workerNodes*_workerCapacity));
_time.gtSleep(execTime.longValue());
}
files[0].releasePin();
_remoteReads++;
continue;
}
else {
fileSE.accessFile(files[0]);
_localReads++;
}
// process the file
if(_workerNodes != 0) {
execTime = new Double((job.getLatency() + job.getLinearFactor()*files[0].size())/(_workerNodes*_workerCapacity));
// System.out.println(this.toString()+"> processing file...");
_time.gtSleep(execTime.longValue());
}
files[0].releasePin();
//A while loop the ce enters when paused by gui
while(_paused){
_time.gtWait(this);
}
} // for each datafile in job
// statistics logging
long duration = _time.getTimeMillis() - job.timeStarted();
long durationWithQueue = _time.getTimeMillis() - job.timeScheduled();
if( duration < 0) {
OptorSimOut.println("BUG> Duration < 0!!");
}
_totalJobTime += durationWithQueue;
_workingTime += duration;
_jobsCompleted++;
if( params.outputStatistics() == 3 || params.useGui()) {
_jobTimes.put(job.toString(), new Long(duration));
_jobTimesWithQueue.put(job.toString(), new Long(durationWithQueue));
_jobFiles.put( job.toString(), filesAccessed);
}
} // while there are jobs left to run
_runnable = false;
} // run
/**
* A routine used by the CE to simulate remote IO. The GridContainer's copy() method is
* used to block the equivalent amount of time.
*/
protected void simulateRemoteIO( DataFile remoteFile, float fraction)
{
GridContainer gc = GridContainer.getInstance();
gc.copy( remoteFile, _site, fraction);
}
/**
* GUI calls this method to pause the ComputingElement
* threads when pause button is pressed.
*/
public void pauseCE() {
_paused = true;
}
/**
* GUI calls this method to unpause the ComputingElement
* threads when continue button is pressed.
*/
public void unpauseCE() {
_paused = false;
_time.gtNotify(this);
}
/**
* The ResourceBroker calls this method when it has
* distributed all the jobs to shut down the ComputingElement
* threads.
*/
public void shutDownCE(){
_imAlive = false;
}
}
at org.edg.data.replication.optorsim.SiteDataThread.run(SiteDataThread.java:112)
erreur在排队
st = ce.getStatistics();
{
//get the statistics object for this comp. element
ce = site.getCE();
st = ce.getStatistics();
//sample mean job time
Object r1 = st.getStatistic("meanJobTime");
String stat1 = r1.toString();
int stat1Int = Integer.parseInt(stat1);
seriesSMJTVTime.add(timeSecs, stat1Int);
//sample job times
Object r2 = st.getStatistic("jobTimes");
Map m = (Map)r2;
int pairs = m.size();
//if (number of previous key-value pairs != pairs)
// instantiate new histarray and fill with job time values
if (prevNoOfPairs!=pairs)
{
histarray = new double[pairs];
int i=0;
prevNoOfPairs++;
Set keySet = m.keySet();
Iterator iter = keySet.iterator();
while (iter.hasNext())
{
Object key = iter.next();
Object value = m.get(key);
String duration = value.toString();
float jobTime = Float.parseFloat(duration);
histarray[i] = jobTime;
i++;
}
}
//sample usage
Object r3 = st.getStatistic("usage");
String stat3 = r3.toString();
float coUsage = Float.parseFloat(stat3);
/* if (range values identical for last three readings)
* remove intermediate statistic
*/
if (coUsage==prevCoUsage&&coUsage==prevPrevCoUsage)
{
int itemCount = seriesSSEUVTime.getItemCount();
if (itemCount>2)
seriesSSEUVTime.remove(itemCount-1);
}
prevPrevCoUsage = prevCoUsage;
prevCoUsage = coUsage;
seriesSCEUVTime.add(timeSecs, coUsage);
}
}
最佳答案
ConcurrentModificationException是说有人更改了一个项目,它不是在itme中使用当前线程时的当前线程。
因此,其他一些线程可能会遍历地图。
这可能是一个对象或单例对象,因为此行:OptorSimParameters.getInstance();
通常与Singelton设计模式一起使用。
因此,这意味着同一对象在2个地方发生了变化。.从您显示的代码和另一个代码部分开始。
如果这是开源的,则可能无法使用,或者是您发现的错误。.您实际上并没有什么用,除非找到谁呼叫OptorSimParameters.getInstance();
并尝试了解还有谁将此呼叫方法并更改或使用此地图
希望有道理
编辑:
发生这种情况时会发生此异常
Iterator<Point> iter = points.iterator();
while (iter.hasNext()) {
Point p = iter.next();
//Antoher loop inside this one that will just iterate the itmes
Iterator<Point> iter2 = points.iterator();
while (iter2.hasNext()) {
Point p = iter2.next();
if (p.equals(pointWeWantToRemove)) {
iter2.remove();
}
}
iter.remove();
上面的这种情况只是在内部循环中进行。外部循环一一删除项,而内部循环将删除特定的Point。
在这种情况下,将抛出Exce [tion,因为迭代器在同一列表上运行,并且彼此删除项。
一旦迭代器创建了
points.iterator()
,它将保存集合的init大小,然后进行检查:if (initSize != collection.size()) {
throw new ConcurrentModificationException ();
}
因此,如果您以某种方式创建了2个迭代器,这些迭代器同时更改了相同的集合,那么这件事将会发生……您展示了代码的一部分,但是,如果您找到另一个迭代器,我们需要考虑如何避免这种事情。如果使用集合的其他方法只是读取它,则可以在读取它之前创建一个克隆,然后遍历该集合的克隆,它将解决您的问题
希望有道理