Commit 0624bf84 by lizefeng

新增库存差异计算服务

parent 6f8ceb22
using AutoTurnOver.Models;
using AutoTurnOver.Utility;
using System;
using System.Linq;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace AutoTurnOver.DB
{
public class dc_ana_deviation_dao : connectionHelper
{
/// <summary>
/// 分析差异数据
/// </summary>
/// <param name="taskDto"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
public static Task<int> AnaDeviation(t_task_queue taskDto)
{
throw new NotImplementedException();
}
/// <summary>
/// 添加分析任务
/// </summary>
public static void PushAnaTask()
{
// 查询所有需要分析的sku
var skus = _connection.Query<int>(" select id from dc_ana_deviation_sku ").ToList();
foreach (var item in skus)
{
RabbitMQHelper.EnqueneMsg("aims:deviation:input",new t_task_queue {id = item.ToString(),create_time= DateTime.Now });
}
}
}
}
......@@ -14,6 +14,6 @@ namespace AutoTurnOver.DB
public string platform { get; set; }
public string site { get; set; }
public string account { get; set; }
public string product { get; set; }
public string project { get; set; }
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace AutoTurnOver.Models
{
public class t_task_queue
{
public string id { get; set; }
public DateTime create_time { get; set; }
}
}
......@@ -17,6 +17,8 @@
<PackageReference Include="Newtonsoft.Json" Version="12.0.1" />
<PackageReference Include="Qiniu.SDK" Version="8.0.0" />
<PackageReference Include="StackExchange.Redis" Version="2.1.58" />
<PackageReference Include="RabbitMQ.Client" Version="6.4.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="2.1.0" />
</ItemGroup>
</Project>
\ No newline at end of file
using OfficeOpenXml.Packaging.Ionic.Zlib;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Text;
using System.Threading;
namespace AutoTurnOver.Utility
{
public class RabbitMQHelper
{
private static ConnectionFactory connectionFactory = new ConnectionFactory
{
HostName = ConfigHelper.GetValue("RabbitMQ:Host"),
UserName = ConfigHelper.GetValue("RabbitMQ:UserName"),
Password = ConfigHelper.GetValue("RabbitMQ:Password"),
Port = int.Parse(ConfigHelper.GetValue("RabbitMQ:Port")),
};
private static IConnection connection;
private RabbitMQHelper() { }
private static void CreateConn()
{
Console.WriteLine("HostName:" + connectionFactory.HostName);
Console.WriteLine("UserName:" + connectionFactory.UserName);
Console.WriteLine("Password:" + connectionFactory.Password);
connection = connectionFactory.CreateConnection();
}
/// <summary>
/// 发送多条消息-指定Mq
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="queName"></param>
/// <param name="msg"></param>
public static bool SendMessagesToTargetMq<T>(string queName, ConnectionFactory connectionFactory, List<T> msgs) where T : class
{
if (msgs == null && !msgs.Any())
{
return false;
}
try
{
var newConnection = connectionFactory.CreateConnection();
using (var channel = newConnection.CreateModel())
{
//声明一个队列
channel.QueueDeclare(queName, true, false, false, null);
var basicProperties = channel.CreateBasicProperties();
//1:非持久化 2:可持久化
basicProperties.DeliveryMode = 2;
var address = new PublicationAddress(ExchangeType.Direct, "", queName);
msgs.ForEach((msg) =>
{
var payload = Encoding.UTF8.GetBytes(msg.ToJson());
channel.BasicPublish(address, basicProperties, payload);
});
}
newConnection.Close();
newConnection.Dispose();
return true;
}
catch (Exception ex)
{
var gewa = ex;
return false;
}
}
/// <summary>
/// 发送消息
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="queName"></param>
/// <param name="msg"></param>
public static bool SendMsg<T>(string exchangeName, string queName, T msg, bool doCompress = false) where T : class
{
if (msg == null)
{
return false;
}
try
{
if (connection == null || !connection.IsOpen)
{
CreateConn();
}
using (var channel = connection.CreateModel())
{
//声明一个队列
channel.QueueDeclare(queName, true, false, false, null);
if (!string.IsNullOrEmpty(exchangeName))
{
//声明交换机
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, true);
//绑定队列,交换机,路由键
channel.QueueBind(queName, exchangeName, queName);
}
var basicProperties = channel.CreateBasicProperties();
//1:非持久化 2:可持久化
basicProperties.DeliveryMode = 2;
var inputBytes = Encoding.UTF8.GetBytes(msg.ToJson());
var payload = doCompress ? CompressZlipBytes(inputBytes) : inputBytes;
var address = new PublicationAddress(ExchangeType.Direct, exchangeName, queName);
channel.BasicPublish(address, basicProperties, payload);
}
return true;
}
catch (Exception)
{
return false;
}
}
/// <summary>
/// 发送多条消息
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="queName"></param>
/// <param name="msg"></param>
public static bool SendMessages<T>(string exchangeName, string queName, List<T> msgs, bool doCompress = false) where T : class
{
if (msgs == null && !msgs.Any())
{
return false;
}
try
{
if (connection == null || !connection.IsOpen)
{
CreateConn();
}
using (var channel = connection.CreateModel())
{
//声明一个队列
channel.QueueDeclare(queName, true, false, false, null);
if (!string.IsNullOrEmpty(exchangeName))
{
//声明交换机
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, true);
//绑定队列,交换机,路由键
channel.QueueBind(queName, exchangeName, queName);
}
var basicProperties = channel.CreateBasicProperties();
//1:非持久化 2:可持久化
basicProperties.DeliveryMode = 2;
var address = new PublicationAddress(ExchangeType.Direct, exchangeName, queName);
msgs.ForEach((msg) =>
{
var inputBytes = Encoding.UTF8.GetBytes(msg.ToJson());
var payload = doCompress ? CompressZlipBytes(inputBytes) : inputBytes;
channel.BasicPublish(address, basicProperties, payload);
});
}
return true;
}
catch (Exception ex)
{
throw ex;
}
}
/// <summary>
/// 消息加入队列中
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="queneName"></param>
/// <param name="msg"></param>
/// <returns></returns>
public static bool EnqueneMsg<T>(string queneName, T msg, bool doCompress = false) where T : class
{
if (msg == null)
{
return false;
}
for (int i = 0; i < 3; i++)
{
var pushMsgResult = SendMsg("", queneName, msg, doCompress);
if (pushMsgResult) return true;
Thread.Sleep(100);
}
return false;
}
public static bool EnqueneMessages<T>(string queneName, List<T> msgs, bool doCompress = false) where T : class
{
if (msgs == null && !msgs.Any())
{
return false;
}
for (int i = 0; i < 3; i++)
{
var pushMsgResult = SendMessages("", queneName, msgs, doCompress);
if (pushMsgResult) return true;
Thread.Sleep(100);
}
return false;
}
public static bool GetMessageCount(string queName, out uint count)
{
count = 0;
try
{
if (connection == null || !connection.IsOpen)
{
CreateConn();
}
using (var channel = connection.CreateModel())
{
//声明一个队列
channel.QueueDeclare(queName, true, false, false, null);
count = channel.MessageCount(queName);
}
return true;
}
catch (Exception ex)
{
var gewa = ex;
return false;
}
}
public static IModel GetChannel()
{
if (connection == null || !connection.IsOpen)
{
CreateConn();
}
var channel = connection.CreateModel();
return channel;
}
/// <summary>
/// 消费消息
/// </summary>
/// <param name="queName"></param>
/// <param name="received"></param>
public static void Receive<T>(string exchangeName, string queName, IModel channel, Action<T> received, bool isZip) where T : class
{
try
{
{
//声明一个队列
channel.QueueDeclare(queName, true, false, false, null);
if (!string.IsNullOrEmpty(exchangeName))
{
//声明交换机
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, true);
//绑定队列,交换机,路由键
channel.QueueBind(queName, exchangeName, queName);
}
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
//事件基本消费者
var consumer = new EventingBasicConsumer(channel);
//接收到消息事件
consumer.Received += (ch, ea) =>
{
var temp = isZip ? DeconpressZlip(ea.Body.ToArray()) : ea.Body.ToArray();
string message = Encoding.UTF8.GetString(temp);
var msg = message.ToObj<T>();
DateTime time = DateTime.Now;
received(msg);
var timeEnd = DateTime.Now - time;
//channel.DefaultConsumer.HandleBasicCancelOk(consumer.ConsumerTag);
if (channel.IsClosed)
{
return;
}
Console.WriteLine($"{DateTime.Now} 任务执行完成,用时 {timeEnd.TotalSeconds:0.00}s {queName} 队列剩余任务数量: {channel.MessageCount(queName)}");
//确认该消息已被消费
channel.BasicAck(ea.DeliveryTag, false);
};
//启动消费者 设置为手动应答消息
channel.BasicConsume(queName, false, consumer);
}
}
catch (Exception ex)
{
throw ex;
}
Thread.Sleep(60);
}
/// <summary>
/// 释放链接
/// </summary>
public void Dispose()
{
//channel.Dispose();
connection.Close();
}
/// <summary>
/// 关闭连接
/// </summary>
public void ReceiveClose()
{
//channel.BasicCancel(consumer.ConsumerTag);
// channel.Close();
//Dispose();
}
/// <summary>
/// 解压zlip压缩二进制
/// </summary>
/// <param name="bytes"></param>
/// <returns></returns>
public static byte[] DeconpressZlip(byte[] bytes)
{
using (var compressStream = new MemoryStream(bytes))
{
using (var zipStream = new ZlibStream(compressStream, OfficeOpenXml.Packaging.Ionic.Zlib.CompressionMode.Decompress))
{
using (var resultStream = new MemoryStream())
{
zipStream.CopyTo(resultStream);
return resultStream.ToArray();
}
}
}
}
//zlip压缩字节
//1.创建压缩的数据流
//2.设定compressStream为存放被压缩的文件流,并设定为压缩模式
//3.将需要压缩的字节写到被压缩的文件流
public static byte[] CompressZlipBytes(byte[] bytes)
{
using (MemoryStream compressStream = new MemoryStream())
{
using (var zipStream = new ZlibStream(compressStream, OfficeOpenXml.Packaging.Ionic.Zlib.CompressionMode.Compress))
{
zipStream.Write(bytes, 0, bytes.Length);
return compressStream.ToArray();
}
}
}
}
}
using Microsoft.Extensions.Hosting;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client;
namespace AutoTurnOver.Utility
{
public class RabbitWorkerBase<T> : BackgroundService where T : class
{
private List<IModel> _channelList = new List<IModel>();
private string _taskName;
private string _queueName;
private int _exampleNumber;
private Timer _timer;
private bool _isZip;
public RabbitWorkerBase(string taskName, string queueName, int exampleNumber, bool isZip = false)
{
_taskName = taskName;
_queueName = queueName;
_exampleNumber = exampleNumber;
_isZip = isZip;
}
private void TimerDoWork(object state)
{
int changeExampleNumber = 2;
//for (int i = 0; i < _channelList.Count; i++)
//{
// var channel = _channelList[0];
// channel.Close();
// channel.Dispose();
// _channelList.Remove(channel);
//}
while (_channelList.Count >= 1)
{
var channel = _channelList[0];
channel.Close();
channel.Dispose();
_channelList.Remove(channel);
}
for (int i = 0; i < changeExampleNumber; i++)
{
var channel = RabbitMQHelper.GetChannel();
RabbitMQHelper.Receive<T>("", _queueName, channel, (taskDto) => DoWorkAsync(taskDto).GetAwaiter().GetResult(), _isZip);
_channelList.Add(channel);
}
}
protected virtual Task DoWork(T taskDto)
{
return Task.CompletedTask;
}
private async Task DoWorkAsync(T taskDto)
{
try
{
await DoWork(taskDto);
}
catch (Exception ex)
{
Console.WriteLine($"异常:{ex.Message},详细{ex.StackTrace}");
}
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
Console.WriteLine($"{_taskName}已启动");
for (int i = 0; i < _exampleNumber; i++)
{
var channel = RabbitMQHelper.GetChannel();
RabbitMQHelper.Receive<T>("", _queueName, channel, (taskDto) => DoWorkAsync(taskDto).GetAwaiter().GetResult(), _isZip);
_channelList.Add(channel);
}
_timer = new Timer(TimerDoWork, null, TimeSpan.FromMinutes(120), TimeSpan.FromMinutes(120));
return Task.CompletedTask;
}
public override void Dispose()
{
base.Dispose();
_timer?.Dispose();
}
}
}
using AutoTurnOver.DB;
using AutoTurnOver.Models;
using AutoTurnOver.Utility;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace ResetOutofstock
{
class DeviationRabbitBackgroundService : RabbitWorkerBase<t_task_queue>
{
public DeviationRabbitBackgroundService() : base("差异分析服务", "aims:deviation:input", 10)
{
}
protected override async Task DoWork(t_task_queue taskDto)
{
try
{
int hit = await dc_ana_deviation_dao.AnaDeviation(taskDto);
Console.WriteLine($"差异分析服务 完成 {hit} ");
}
catch (Exception e)
{
RabbitMQHelper.EnqueneMsg("aims:deviation:input", taskDto);
Console.WriteLine($"差异分析服务 数据异常,异常原因为:{e.Message},异常堆栈为:{e.StackTrace}");
}
}
}
}
......@@ -50,6 +50,7 @@ namespace ResetOutofstock
//report_invest_return_dao.ShareAdFee();
//report_invest_return_dao.SynchBtmOrderRefund();
//report_invest_return_dao.CalculationStockScore("962073701");
//dc_ana_deviation_dao.PushAnaTask();
}
catch (Exception ex)
{
......@@ -64,6 +65,7 @@ namespace ResetOutofstock
services.AddHostedService<CaseFlowBackgrounService>();
services.AddHostedService<StockBackgrounService>();
services.AddHostedService<ReportFinanceBackgrounService>();
services.AddHostedService<DeviationRabbitBackgroundService>();
});
await builder.RunConsoleAsync();
......
......@@ -19,7 +19,7 @@ namespace ResetOutofstock
return Task.CompletedTask;
}
private void DoWork(object state)
private async void DoWork(object state)
{
var now = DateTime.Now;
......@@ -71,6 +71,18 @@ namespace ResetOutofstock
//dc_report_finance_dao.CalculationAccounts(DateTime.Now.AddDays(-90));
Console.WriteLine($"结束 财报应首付账款,线程Id:{Thread.CurrentThread.ManagedThreadId}{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}");
}
if (now.Hour == 12 && now.Minute == 30)
{
Console.WriteLine($"开始 计算库存差异 ,线程Id:{Thread.CurrentThread.ManagedThreadId}{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}");
dc_ana_deviation_dao.PushAnaTask();
Console.WriteLine($"结束 计算库存差异,线程Id:{Thread.CurrentThread.ManagedThreadId}{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}");
}
if (now.Hour == 23 && now.Minute == 30)
{
Console.WriteLine($"开始 计算库存差异 ,线程Id:{Thread.CurrentThread.ManagedThreadId}{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}");
dc_ana_deviation_dao.PushAnaTask();
Console.WriteLine($"结束 计算库存差异,线程Id:{Thread.CurrentThread.ManagedThreadId}{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}");
}
if (now.Hour == 23 && now.Minute == 50)
{
Console.WriteLine($"开始 备份投资回报分析表 ,线程Id:{Thread.CurrentThread.ManagedThreadId}{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}");
......
......@@ -53,5 +53,11 @@
"btm-sys": {
"ad-fee": "http://btm.bailuntec.com/ScrmApi/api/ApiGetAdFeeDayFull",
"order-refund": "http://btm.bailuntec.com/ScrmApi/api/ApiGetOrderRefundLog"
},
"RabbitMQ": {
"Host": "rabbitmq.bailuntec.com",
"Port": "6783",
"UserName": "bailun",
"Password": "bailun2022"
}
}
......@@ -54,5 +54,11 @@
"btm-sys": {
"ad-fee": "http://btm.bailuntec.com/ScrmApi/api/ApiGetAdFeeDayFull",
"order-refund": "http://btm.bailuntec.com/ScrmApi/api/ApiGetOrderRefundLog"
},
"RabbitMQ": {
"Host": "10.0.6.36",
"UserName": "bailun",
"Password": "bailun2022",
"Port": "6783"
}
}
......@@ -53,6 +53,12 @@
"btm-sys": {
"ad-fee": "http://btm.bailuntec.com/ScrmApi/api/ApiGetAdFeeDayFull",
"order-refund": "http://btm.bailuntec.com/ScrmApi/api/ApiGetOrderRefundLog"
},
"RabbitMQ": {
"Host": "10.0.6.36",
"UserName": "bailun",
"Password": "bailun2022",
"Port": "6783"
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment