IOTCloudBackgroundService.cs 21.9 KB
using HHECS.DAQShared.Models;
using HHECS.DAQWebClient.Data;
using HHECS.DAQWebClient.Model;
using HHECS.DAQWebClient.Models;
using System.Diagnostics;

namespace HHECS.DAQWebClient.Services
{
    public class IOTCloudBackgroundService : BackgroundService
    {
        private readonly IFreeSql<LocalFlag> _freeSql;
        private readonly CommonService _commonService;
        private readonly IOTCloudService _cloudService;

        public IOTCloudBackgroundService(IFreeSql<LocalFlag> freeSql, CommonService commonService, IOTCloudService cloudService)
        {
            _freeSql = freeSql;
            _commonService = commonService;
            _cloudService = cloudService;
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            //缓存时间,分钟
            const int cacheMinutesTime = 3;
            var _lastAExecution = DateTime.MinValue;
            var _lastBExecution = DateTime.MinValue;
            while (!stoppingToken.IsCancellationRequested)
            {
                try
                {
                    await Task.Delay(1000, stoppingToken);
                    //更新客户端状态
                    if ((DateTime.Now - _lastAExecution) > TimeSpan.FromMinutes(1))
                    {
                        _lastAExecution = DateTime.Now;
                        var clientIdValue = _freeSql.Queryable<LocalConfig>().Where(x => x.Code == ConfigType.ClientId.ToString()).First(x => x.Value);
                        _ = Guid.TryParse(clientIdValue, out var clientId);
                        if (clientId == Guid.Empty)
                        {
                            _commonService.PrintLog($"推送客户端状态数据失败:当前客户端未配置有效ClientId", LogLevel.Warning);
                        }
                        else
                        {
                            var updateStatusResult = await _cloudService.UpdateClientStatusAsync(clientId);
                            if (!updateStatusResult.Success)
                            {
                                _commonService.PrintLog($"推送客户端状态数据失败:{updateStatusResult.Msg}", LogLevel.Error);
                            }
                        }
                    }

                    var commitCountValue = _freeSql.Queryable<LocalConfig>().Where(x => x.Code == ConfigType.CommitCount.ToString()).First(x => x.Value);
                    _ = int.TryParse(commitCountValue, out var _commitCount);
                    if (_commitCount == 0)
                    {
                        //未配置,默认100条
                        _commitCount = 100;
                    }

                    var haveRecord = _freeSql.Queryable<EquipmentDataQueue>().Any();
                    if (!_commonService.EquipmentDataQueues.IsEmpty)
                    {
                        var temps = new List<EquipmentDataQueue>();
                        var commitFailureRecords = new List<EquipmentDataQueue>();
                        //启用数据压缩
                        if (_commonService.DataCompression)
                        {
                            while (!_commonService.EquipmentDataQueues.IsEmpty)
                            {
                                var result = _commonService.EquipmentDataQueues.TryDequeue(out var item);
                                if (item != null)
                                {
                                    temps.Add(item);
                                }
                            }
                        }
                        //未启用数据压缩
                        else
                        {
                            for (int i = 0; i < _commitCount; i++)
                            {
                                var result = _commonService.EquipmentDataQueues.TryDequeue(out var item);
                                if (!result) break;//队列已清空
                                if (item != null)
                                {
                                    var equipmentId = _freeSql.Queryable<Equipment>().Where(x => x.Code == item.EquipmentCode).First(x => x.Id);
                                    var equipmentProp = _freeSql.Queryable<EquipmentProp>().Where(x => x.EquipmentId == equipmentId && x.Code == item.EquipmentPropCode).First();
                                    if (equipmentProp != null)
                                    {
                                        //更新设备属性时间
                                        _freeSql.Update<EquipmentProp>(equipmentProp.Id).Set(x => x.Updated, equipmentProp.Updated).ExecuteAffrows();
                                    }
                                    temps.Add(item);
                                }
                            }

                            //自动上传启用,且数据库无未上传的数据,则直接推送
                            if (_commonService.AutoCommit && !_commonService.DataCompression && !haveRecord)
                            {
                                var tasks = new List<Task<List<EquipmentDataQueue>>>();
                                foreach (var item in temps.GroupBy(x => x.EquipmentCode))
                                {
                                    tasks.Add(Task.Run(async () =>
                                    {
                                        Stopwatch stopwatch = Stopwatch.StartNew();
                                        var records = item.OrderBy(x => x.SourceTimestamp).ToList();
                                        var result = await _cloudService.SendEquipmentDataAsync(records);
                                        if (!result.Success)
                                        {
                                            _commonService.PrintLog($"推送设备[{item.Key}]数据失败,{result.Msg},耗时:{stopwatch.ElapsedMilliseconds}ms", LogLevel.Error);
                                            return records;
                                        }
                                        _commonService.PrintLog($"成功推送{records.Count}条设备[{item.Key}]数据,耗时:{stopwatch.ElapsedMilliseconds}ms");
                                        return new List<EquipmentDataQueue>();
                                    }, stoppingToken));
                                }
                                Task.WaitAll(tasks.ToArray(), stoppingToken);
                                commitFailureRecords = tasks.SelectMany(x => x.Result).ToList();
                            }
                            //自动上传关闭或数据库存在未上传的记录,则需要存入数据库
                            else
                            {
                                commitFailureRecords.AddRange(temps);
                            }

                            //将上传失败的数据存入数据库
                            if (commitFailureRecords.Count > 0)
                            {
                                //节流模式
                                if (_commonService.DataCompression)
                                {
                                    var dictionaryUpdateTemps = new Dictionary<string, EquipmentDataQueue>();
                                    var dictionaryAddTemps = new Dictionary<string, List<EquipmentDataQueue>>();
                                    foreach (var item in commitFailureRecords.GroupBy(x => x.EquipmentCode))
                                    {
                                        var lastRecord = _freeSql.Queryable<EquipmentDataQueue>().Where(x => x.EquipmentCode == item.Key).OrderByDescending(x => x.SourceTimestamp).First();
                                        var addTemps = new List<EquipmentDataQueue>();
                                        dictionaryAddTemps[item.Key] = addTemps;//传递集合引用
                                        foreach (var record in item.OrderBy(x => x.SourceTimestamp).ToList())
                                        {
                                            var lastTemp = lastRecord;
                                            if (addTemps.Count > 0)
                                            {
                                                lastTemp = addTemps.OrderByDescending(x => x.SourceTimestamp).First();
                                            }

                                            if (lastTemp != null)
                                            {
                                                var currentRecordTime = DateTimeOffset.FromUnixTimeMilliseconds(record.SourceTimestamp).LocalDateTime;
                                                var lastRecordTime = (lastTemp.Updated ?? lastTemp.Created!).Value;
                                                if (lastTemp.Value == record.Value && currentRecordTime.Date == lastRecordTime.Date
                                                && (currentRecordTime - lastRecordTime).TotalSeconds <= 5)
                                                {
                                                    lastTemp.Updated = currentRecordTime;

                                                    //是数据库里面的数据,有变化需要更新
                                                    if (lastTemp.Id != Guid.Empty && lastTemp.Id == lastRecord.Id)
                                                    {
                                                        dictionaryUpdateTemps[item.Key] = lastTemp;
                                                    }
                                                    continue;
                                                }
                                            }
                                            //其他情况,新增记录
                                            addTemps.Add(record);
                                        }
                                    }

                                    var allUpdateTemps = dictionaryUpdateTemps.Select(x => x.Value).ToList();
                                    if (allUpdateTemps.Count > 0)
                                    {
                                        _freeSql.Update<EquipmentDataQueue>().SetSource(allUpdateTemps).UpdateColumns(x => x.Updated).ExecuteAffrows();
                                        _commonService.PrintLog($"更新[{allUpdateTemps.Count}]条设备记录");
                                    }

                                    var allAddTemps = dictionaryAddTemps.SelectMany(x => x.Value).ToList();
                                    if (allAddTemps.Count > 0)
                                    {
                                        _freeSql.Insert(allAddTemps).ExecuteAffrows();
                                        _commonService.PrintLog($"新增{allAddTemps.Count}条设备记录");
                                    }
                                }
                                //实时存储
                                else
                                {
                                    _freeSql.Insert(commitFailureRecords).ExecuteAffrows();
                                    _commonService.PrintLog($"新增{commitFailureRecords.Count}条数据记录");
                                }
                                //更新状态
                                haveRecord = _freeSql.Queryable<EquipmentDataQueue>().Any();
                            }
                        }

                        var timeSpan = TimeSpan.FromSeconds(5);//默认5秒一次
                        var equipmentTotal = _freeSql.Queryable<Equipment>().Where(x => !x.Disable).Count();
                        var recordsTotal = _freeSql.Queryable<EquipmentDataQueue>().Count();
                        if (recordsTotal >= equipmentTotal * _commitCount)
                        {
                            //数据较多时,1秒一次
                            timeSpan = TimeSpan.FromSeconds(1);
                        }

                        //推送数据
                        if (_commonService.AutoCommit && (DateTime.Now - _lastBExecution) >= timeSpan && haveRecord)
                        {
                            _lastBExecution = DateTime.Now;
                            var equipmentCodes = _freeSql.Queryable<EquipmentDataQueue>().Distinct().ToList(x => x.EquipmentCode);
                            var commitSuccessRecordIds = new List<Guid>();

                            //是否存在更新时间为空的记录,最后一条记录不算
                            var haveUpdateIsNull = equipmentCodes.Where(equipmentCode =>
                            {
                                //第一条更新时间为空的记录
                                var firstTemp = _freeSql.Queryable<EquipmentDataQueue>().Where(x => x.EquipmentCode == equipmentCode && x.Updated == null).OrderBy(x => x.SourceTimestamp).First(x => x.Id);
                                if (firstTemp == default)
                                {
                                    //无记录
                                    return false;
                                }
                                //最后一条记录
                                var lastTemp = _freeSql.Queryable<EquipmentDataQueue>().Where(x => x.EquipmentCode == equipmentCode).OrderByDescending(x => x.SourceTimestamp).First(x => x.Id);
                                //第一条与最后一条是同一条,则排除
                                if (firstTemp == lastTemp)
                                {
                                    return false;
                                }
                                //存在多条
                                return true;
                            }).Any();

                            //节流模式
                            if (_commonService.DataCompression && !haveUpdateIsNull)
                            {
                                //超过设定时间的数据
                                var records = _freeSql.Queryable<EquipmentDataQueue>().Where(x => DateTime.Now.AddMinutes(-(cacheMinutesTime + 1)) >= x.Created && x.Updated != null).OrderBy(x => x.SourceTimestamp).Take(_commitCount).ToList();
                                if (records.Count == 0)
                                {
                                    continue;
                                }
                                var stopwatch = Stopwatch.StartNew();
                                var result = await _cloudService.SendEquipmentDataV2Async(records);
                                var currentEquipmentCodes = records.Select(x => x.EquipmentCode).Distinct().ToList();
                                if (!result.Success)
                                {
                                    _commonService.PrintLog($"推送设备[{string.Join(',', currentEquipmentCodes)}]数据失败,{result.Msg},耗时:{stopwatch.ElapsedMilliseconds}ms", LogLevel.Error);
                                    continue;
                                }
                                _commonService.PrintLog($"成功推送{records.Count}条设备[{string.Join(',', currentEquipmentCodes)}]数据,耗时:{stopwatch.ElapsedMilliseconds}ms");
                                commitSuccessRecordIds = records.Select(x => x.Id).ToList();
                                _freeSql.Delete<EquipmentDataQueue>().Where(x => commitSuccessRecordIds.Contains(x.Id)).ExecuteAffrows();
                                continue;
                            }

                            //推送成功的数据集合
                            var tasks = new List<Task<List<EquipmentDataQueue>>>();
                            foreach (var equipmentCode in equipmentCodes)
                            {
                                tasks.Add(Task.Run(async () =>
                                {
                                    Stopwatch stopwatch = Stopwatch.StartNew();
                                    var successRecords = new List<EquipmentDataQueue>();
                                    var records = _freeSql.Queryable<EquipmentDataQueue>().Where(x => x.EquipmentCode == equipmentCode && !x.IsCommit).OrderBy(x => x.SourceTimestamp).Take(_commitCount).ToList();
                                    if (records.Count == 0)
                                    {
                                        return successRecords;
                                    }
                                    var temp1s = new List<List<EquipmentDataQueue>>();
                                    var temp2s = new List<EquipmentDataQueue>();
                                    foreach (var record in records)
                                    {
                                        //是最后一条记录,且启用了节流模式
                                        if (_commonService.DataCompression && record == records.Last())
                                        {
                                            //创建时间距离现在的时间未超过设定时间,则不推送
                                            if ((DateTime.Now - record.Created!.Value).TotalMinutes < cacheMinutesTime)
                                            {
                                                break;
                                            }

                                            //队列还有缓存的数据,且持续时间未达到设定值,则不推送
                                            if (!_commonService.EquipmentDataQueues.IsEmpty && record.Updated != null
                                            && (record.Updated - record.Created).Value.TotalMinutes < cacheMinutesTime)
                                            {
                                                break;
                                            }
                                        }

                                        if (temp2s.Count == 0)
                                        {
                                            temp2s.Add(record);
                                            continue;
                                        }

                                        var lastItem = temp2s.Last();
                                        var lastType = lastItem.Updated > lastItem.Created;
                                        var currentType = record.Updated > record.Created;
                                        if (lastType == currentType)
                                        {
                                            temp2s.Add(record);
                                        }
                                        else
                                        {
                                            temp1s.Add(temp2s);
                                            temp2s = new List<EquipmentDataQueue>
                                                {
                                                    record
                                                };
                                        }
                                    }
                                    if (temp2s.Count > 0)
                                    {
                                        temp1s.Add(temp2s);
                                        temp2s = new List<EquipmentDataQueue>();
                                    }

                                    foreach (var item in temp1s)
                                    {
                                        //某段时间的数据
                                        if (item.All(x => x.Updated > x.Created))
                                        {
                                            var result = await _cloudService.SendEquipmentDataV2Async(item);
                                            if (!result.Success)
                                            {
                                                _commonService.PrintLog($"推送设备[{equipmentCode}]数据失败,{result.Msg},耗时:{stopwatch.ElapsedMilliseconds}ms", LogLevel.Error);
                                                break;//推送失败,后续的数据也暂停推送
                                            }
                                            _commonService.PrintLog($"成功推送{records.Count}条设备[{equipmentCode}]数据,耗时:{stopwatch.ElapsedMilliseconds}ms");
                                            successRecords.AddRange(item);
                                            continue;
                                        }

                                        //一秒一条记录的数据
                                        var result2 = await _cloudService.SendEquipmentDataAsync(item);
                                        if (!result2.Success)
                                        {
                                            _commonService.PrintLog($"推送设备[{equipmentCode}]数据失败,{result2.Msg},耗时:{stopwatch.ElapsedMilliseconds}ms", LogLevel.Error);
                                            break;//推送失败,后续的数据也暂停推送
                                        }
                                        _commonService.PrintLog($"成功推送1条设备[{equipmentCode}]数据,耗时:{stopwatch.ElapsedMilliseconds}ms");
                                        successRecords.AddRange(item);
                                    }
                                    return successRecords;
                                }));
                            }
                            Task.WaitAll(tasks.ToArray());
                            commitSuccessRecordIds = tasks.SelectMany(x => x.Result).Select(x => x.Id).ToList();
                            _freeSql.Delete<EquipmentDataQueue>().Where(x => commitSuccessRecordIds.Contains(x.Id)).ExecuteAffrows();
                        }
                    }
                }
                catch (Exception ex)
                {
                    _commonService.PrintLog($"数据上传线程异常:{ex.Message}", LogLevel.Debug);
                }
            }
        }
    }
}