public abstract class AbstractDistributedTask<RESULT> extends java.lang.Object implements ITask<RESULT>
备注: 1. 消息传输目前只支持基础数据类型,并且采用数组进行发送 2. 使用者如果觉得框架不方便可以自行封装
Modifier and Type | Field and Description |
---|---|
java.util.Set<java.lang.Integer> |
clusterNodeIds |
protected int |
currentDepth |
protected java.util.concurrent.ExecutorService |
executorService |
protected java.util.List<java.util.concurrent.Future<?>> |
futures |
protected Graph |
graph |
protected LoggerUtil |
graphLogger |
protected IdsMap |
idsMap |
protected boolean |
isStop |
protected java.util.concurrent.CountDownLatch |
latchDepth |
protected int |
limit |
protected int |
localClusterNodeId |
protected int |
localEndSeqId |
protected int |
localPartSize |
protected int |
localStartSeqId |
protected IMessageHandler |
messageUtil |
protected int |
partSize |
protected int |
sourceClusterNodeId |
TaskStage |
stage |
protected TaskHandler |
taskHandler |
protected long |
taskId |
Constructor and Description |
---|
AbstractDistributedTask(java.lang.String taskName,
Graph graph,
GraphLogger graphLogger,
int sourceClusterNodeId,
int maxIterations) |
Modifier and Type | Method and Description |
---|---|
protected void |
checkParam(int iterations)
检查迭代轮数是否合法
迭代需要 >= 1
|
boolean |
distributed()
是否为分布式任务.
|
void |
firstRound()
初始化轮
|
abstract void |
firstRoundContext()
第一轮消息发送的方法
|
Graph |
getGraph()
获取图对象.
|
int |
getLimit()
返回结果限制
|
int |
getLocalClusterNodeId() |
LoggerUtil |
getLogger()
获取计算框架写日志对象.
|
ArrayMessage |
getMessage() |
int |
getSourceClusterNodeId() |
boolean |
getStop()
获取计算框架停止值.
|
long |
getTaskId()
获取任务号.
|
java.lang.String |
getTaskName()
返回任务名.
|
TaskStage |
getTaskStage()
获取任务状态.
|
abstract void |
init()
变量相关的初始化方法
|
void |
isRunningOrThrow()
停止异常
|
void |
nextRound(int depth)
下一轮计算
|
abstract void |
nextRoundContext(int depth)
后续轮算法实现
|
protected void |
parallelExecute(java.util.List<java.util.concurrent.Future<?>> futures)
获取并行执行的任务
|
abstract void |
release()
释放申请的变量内存
|
abstract RESULT |
result(int limit)
返回结果
|
void |
setDepth(int depth)
设置当前深度
|
void |
setLatchDepth(java.util.concurrent.CountDownLatch countDownLatch)
设置当前轮
|
ArrayMessage |
setMessage(java.lang.Object sendValues,
java.lang.Object receiveValues)
设置消息发送对象
根据用户传入的特定类型生成对应的消息处理器
目前支持基础数据类型 int、double、long和float
|
abstract ArrayMessage |
setMessageObject()
设置消息发送对象
|
void |
setStop()
设置算法任务停止
|
protected java.util.concurrent.CountDownLatch latchDepth
public TaskStage stage
protected long taskId
protected Graph graph
protected final int sourceClusterNodeId
protected IMessageHandler messageUtil
protected final LoggerUtil graphLogger
protected final TaskHandler taskHandler
protected final int localClusterNodeId
protected final int localStartSeqId
protected final int localEndSeqId
protected int localPartSize
protected final int partSize
protected int limit
protected java.util.concurrent.ExecutorService executorService
protected java.util.List<java.util.concurrent.Future<?>> futures
public final java.util.Set<java.lang.Integer> clusterNodeIds
protected final IdsMap idsMap
protected int currentDepth
protected boolean isStop
public AbstractDistributedTask(java.lang.String taskName, Graph graph, GraphLogger graphLogger, int sourceClusterNodeId, int maxIterations)
public void isRunningOrThrow()
public final void setDepth(int depth)
depth
- 深度public final void nextRound(int depth)
depth
- 深度public final void firstRound()
public abstract void firstRoundContext()
public abstract void nextRoundContext(int depth)
depth
- 深度public abstract ArrayMessage setMessageObject()
public abstract RESULT result(int limit)
public final ArrayMessage setMessage(java.lang.Object sendValues, java.lang.Object receiveValues)
sendValues
- 发送对象receiveValues
- 接受对象public java.lang.String getTaskName()
ITask
getTaskName
in interface ITask<RESULT>
public long getTaskId()
ITask
public LoggerUtil getLogger()
ITask
public final void setStop()
ITask
public final boolean getStop()
ITask
public TaskStage getTaskStage()
ITask
getTaskStage
in interface ITask<RESULT>
public int getLocalClusterNodeId()
public final boolean distributed()
ITask
distributed
in interface ITask<RESULT>
protected void parallelExecute(java.util.List<java.util.concurrent.Future<?>> futures)
futures
- call任务集合public int getSourceClusterNodeId()
protected void checkParam(int iterations)
iterations
- 迭代轮数public ArrayMessage getMessage()
public final void setLatchDepth(java.util.concurrent.CountDownLatch countDownLatch)
countDownLatch
- 迭代轮public final int getLimit()