云计算百科
云计算领域专业知识百科平台

构建工业自动化软件框架:流程核心搭建(一) 流程引擎、上下文实现

文章同步在公众号与知乎更新,账号全部同名。搜索“小黄花呀小黄花”即可找到我。

前言

前段时间开始了整个工业自动化软件框架的开发,已经完成了Common部分的全部代码。按照计划,下一步的计划就是流程核心内容的实现。按照规划,流程核心包含以下几部分内容:

1. 单步流程接口
2. 流程引擎
3. 状态机与工站管理
4. 流程间交互的事件总线

我对流程想法首先是,所有流程都是单独存储在json文件中,每次启动从本地读取json,然后按照规划的顺序执行流程。而json的来源可以是界面的编辑的自动存储,也可以是来自代码手动编写好后的手动存储,这到了界面开发的时候具体再看。

然后是各个流程之间的交互,计划使用事件来实现,这样避免了全局变量的使用,也可以更好的解耦。

最后是流程的执行,现在计划是回零和运行共用同样的工站管理,也就是一个工站拥有一个回零工站和一个运行工站,后续会在实现的过程中,再根据可行性和方便性进行优化。

本篇主要先实现流程引擎/上下文,这部分也是我这个软件的核心功能。

 类图

代码实现

流程引擎

引擎的主要作用是完成流程的读取以及流程的执行,这部分还包含一个流程控制器接口,主要是将流程引擎的控制独立出来。

接口

/// <summary>
/// 流程引擎接口
/// <summary>
public interface IFlowEngine
{
/// <summary>
/// 流程名称 来自WorkStation
/// </summary>
string FlowName { get; }

/// <summary>
/// 执行流程可等待方法
/// </summary>
/// <param name="context"></param>
/// <param name="cts"></param>
/// <returns></returns>
Task AsyncExecuteFlow(IFlowContext context, CancellationTokenSource cts);
}
/// <summary>
/// 流程控制器接口
/// <summary>
public interface IFlowController
{
/// <summary>
/// 流程已暂停
/// </summary>
bool IsPaused { get; }
/// <summary>
/// 流程已停止
/// </summary>
bool IsStopped { get; }
/// <summary>
/// 流程运行中
/// </summary>
bool IsRunning { get; }
/// <summary>
/// 当前步数
/// </summary>
int CurrentIndex { get; }

void Pause();
void Resume();
void Stop();
}

实现

流程引擎需要同时实现IFlowEngine和IFlowController两个接口。一个用来执行,一个用来控制。

AsyncExecuteFlow方法是该类的核心方法,后续在工站管理中,运行流程时就调用它。方法支持异步执行与取消,CancellationTokenSource实现取消功能,来源是上层的WorkStation。

public class FlowEngine : IFlowEngine, IFlowController
{
#region 构造函数
public FlowEngine(string flowName, IConfigManagerFactory configFactory)
{
FlowName = flowName;
//从文件中加载流程集合
_configManager = configFactory.CreateConfigManager(ConfigType.json, FlowName, "FlowData");
_steps = _configManager.LoadConfig<List<IFlowStep>>();
}
#endregion

#region 属性
public string FlowName { get; }
public bool IsPaused => _isPaused;
public bool IsStopped => _isStopped;
public bool IsRunning => _isRunning;
public int CurrentIndex => _currentIndex;
#endregion

#region 字段
private bool _isPaused;
private bool _isStopped;
private bool _isRunning;
private int _currentIndex;
private ILoggerManager _logger;
private readonly IConfigManager _configManager;
private readonly List<IFlowStep> _steps;
private CancellationTokenSource _cts;
private readonly object _lock = new object();
private IFlowStep _currentStep;
#endregion

#region 方法
public virtual async Task AsyncExecuteFlow(IFlowContext context, CancellationTokenSource cts)
{
_logger = context.Logger;
context.TotalSteps = _steps.Count;
try
{
if (_cts != null)
{
_logger.Info($"流程【{FlowName}】已经在运行中");
return;
}
_isRunning = true;
_cts = cts;
_currentIndex = 0;
while (_currentIndex < _steps.Count && !_isStopped)
{
while (_isPaused && !_isStopped)
{
if (IsCanceled())
{
break;
}
await Task.Delay(100);
}
if (IsCanceled())
{
break;
}
_currentStep = _steps[_currentIndex];
if (_currentStep != null)
{
_logger.Info($"开始执行步骤【{_currentStep.StepName}】…");
var result = await _currentStep.AsyncExecuteStep(context, _cts.Token);
if (result.Status == StepStatus.Failure)
{
string msg = $"步骤【{_currentStep.StepName}】执行失败:{result.Message}";
_logger.Error(msg);
throw new StepExecuteException(msg);
}
}
_currentIndex = context.NextStepIndex;
}
}
catch (OperationCanceledException)
{
_logger.Info($"流程【{FlowName}】被取消");
}
catch (StepExecuteException e)
{
_logger.Error($"流程步骤失败:{e.Message}");
throw;
}
catch (Exception e)
{
_logger.Error($"流程执行异常:{e}");
throw;
}
finally
{
_logger.Info($"流程【{FlowName}】执行完成(或被中止)");
_cts?.Dispose();
_cts = null;
_isStopped = false;
_isPaused = false;
_isRunning = false;
}
}

private bool IsCanceled()
{
if (_cts?.Token.IsCancellationRequested == true)
{
_logger.Info($"流程【{FlowName}】运行已取消");
return true;
}
return false;
}

public void Pause()
{
lock (_lock)
{
if (_isRunning && !_isPaused)
{
_isPaused = true;
_logger.Info($"流程【{FlowName}】已经暂停");
}
}
}

public virtual void Resume()
{
lock (_lock)
{
if (_isRunning && _isPaused)
{
_logger.Info($"流程【{FlowName}】已经恢复");
_isPaused = false;
}
}
}

public virtual void Stop()
{
lock (_lock)
{
_isStopped = true;
}
}
#endregion
}

流程上下文

上下文主要用来存储整个流程中使用到的数据,日志也是放在上下文中,这样流程中任何地方都可以使用同一个日志。同时包含一个Clone浅拷贝方法,高效支持并行流程,后续于单步流程中需要执行一段流程时使用,主要是并行和循环流程使用。

接口

接口中包含包含存储数据_data,这个数据不管有没有分支流程,一个流程中全部都公用一个,存储全局状态。其他参数在并行或者循环时应该分别控制。

public interface IFlowContext
{
/// <summary>
/// 流程名称 来自WorkStation
/// </summary>
string FlowName { get; }
/// <summary>
/// 从单步中获取到的下一步索引
/// </summary>
int NextStepIndex { get; set; }
/// <summary>
/// 流程总步数
/// </summary>
int TotalSteps { get; set; }
/// <summary>
/// 存储流程中的数据
/// </summary>
Dictionary<string, object> Data { get; }
/// <summary>
/// Flow使用日志
/// </summary>
ILoggerManager Logger { get; }

T GetData<T>(string key);
void SetData<T>(string key, T value);
IFlowContext Clone();
}

实现

public class FlowContext : IFlowContext
{
#region 构造函数
public FlowContext(string flowName, ILoggerFactory loggerFactory)
{
FlowName = flowName;
_loggerFactory = loggerFactory;
Logger = _loggerFactory.CreateLogger(flowName);
_data = new Dictionary<string, object>();
}
#endregion

#region 属性
public string FlowName { get; }
public int NextStepIndex { get; set; }
public int TotalSteps { get; set; }
public Dictionary<string, object> Data => _data;
public ILoggerManager Logger { get; }
#endregion

#region 字段
private readonly Dictionary<string, object> _data;
private readonly ILoggerFactory _loggerFactory;
#endregion

#region 方法
public T GetData<T>(string key)
{
return _data.TryGetValue(key, out var value) ? (T)value : default;
}

public void SetData<T>(string key, T value)
{
_data[key] = value;
}

/// <summary>
/// data为浅拷贝,数据公用,其余字段不共用
/// </summary>
/// <returns></returns>
public IFlowContext Clone()
{
var cloned = new FlowContext(this.FlowName, _loggerFactory);
var dataField = typeof(FlowContext).GetField("_data", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance);
dataField?.SetValue(cloned, this._data);
return cloned;
}
#endregion
}

工厂

与前面的日志一样,由于构造函数要传入名称和其他的依赖注入,所以使用工厂模式来创建流程引擎和上下文。

public class FlowContextFactory : IFlowContextFactory
{
#region 构造函数
public FlowContextFactory(ILoggerFactory loggerFactory)
{
_loggerfactory = loggerFactory;
}
#endregion

#region 字段
private readonly ConcurrentDictionary<string, IFlowContext> _flowContextcache = new ConcurrentDictionary<string, IFlowContext>();

private readonly ILoggerFactory _loggerfactory;
#endregion

#region 方法
public IFlowContext CreateFlowContext(string flowName)
{
return _flowContextcache.GetOrAdd(flowName, new FlowContext(flowName, _loggerfactory));
}
#endregion
}

public class FlowEngineFactory : IFlowEngineFactory
{
#region 构造函数
public FlowEngineFactory(IConfigManagerFactory configFactory)
{
_configFactory = configFactory;
}
#endregion

#region 字段
private readonly ConcurrentDictionary<string, IFlowEngine> _flowEnginecache = new ConcurrentDictionary<string, IFlowEngine>();

private readonly IConfigManagerFactory _configFactory;
#endregion

#region 方法
public IFlowEngine CreateFlowEngine(string flowName)
{
return _flowEnginecache.GetOrAdd(flowName, new FlowEngine(flowName, _configFactory));
}
#endregion
}

容器注册

工厂注册到容器中,然后通过DI的方式再注入后续工站管理中,最后在工站管理中创建实例。

builder.RegisterType<FlowEngineFactory>()
.As<IFlowEngineFactory>()
.SingleInstance();

builder.RegisterType<FlowContextFactory>()
.As<IFlowContextFactory>()
.SingleInstance();

 

后记

当前已完成流程引擎与上下文模块的开发,这两者构成整个流程控制系统的核心,下一步将继续推进流程单步(FlowStep)的设计与实现。后续也会涵盖状态机、事件系统等子模块的开发。相关源代码已托管至 GitHub,欢迎感兴趣的朋友查看与提出建议。

 欢迎关注专栏查看以往内容以及后续更新。构建工业自动化软件框架

代码放在了Github上,欢迎围观,欢迎 Star/Fork/ 提Issue。 Sophon_Github地址

知乎公众号同步更新,欢迎关注公众号。

 

赞(0)
未经允许不得转载:网硕互联帮助中心 » 构建工业自动化软件框架:流程核心搭建(一) 流程引擎、上下文实现
分享到: 更多 (0)

评论 抢沙发

评论前必须登录!