Commit 8f7102f3 by huluobin

# 更新

parent cfa664a1
...@@ -162,4 +162,7 @@ public interface DcBaseQueueMapper { ...@@ -162,4 +162,7 @@ public interface DcBaseQueueMapper {
* @project https://github.com/itfsw/mybatis-generator-plugin * @project https://github.com/itfsw/mybatis-generator-plugin
*/ */
int upsertWithBLOBs(DcBaseQueue record); int upsertWithBLOBs(DcBaseQueue record);
}
\ No newline at end of file
List<DcBaseQueue> poll(int shardingItem);
}
...@@ -5,7 +5,6 @@ import com.alibaba.fastjson.annotation.JSONField; ...@@ -5,7 +5,6 @@ import com.alibaba.fastjson.annotation.JSONField;
import com.bailuntec.domain.constant.CommonConstant; import com.bailuntec.domain.constant.CommonConstant;
import com.bailuntec.domain.entity.DcBaseQueue; import com.bailuntec.domain.entity.DcBaseQueue;
import com.bailuntec.domain.entity.DcBaseStock; import com.bailuntec.domain.entity.DcBaseStock;
import com.bailuntec.domain.example.DcBaseQueueExample;
import com.bailuntec.domain.example.DcBaseStockExample; import com.bailuntec.domain.example.DcBaseStockExample;
import com.bailuntec.mapper.DcBaseQueueMapper; import com.bailuntec.mapper.DcBaseQueueMapper;
import com.bailuntec.mapper.DcBaseStockMapper; import com.bailuntec.mapper.DcBaseStockMapper;
...@@ -45,41 +44,36 @@ public class QueueConsumerJob implements SimpleJob { ...@@ -45,41 +44,36 @@ public class QueueConsumerJob implements SimpleJob {
public void execute(ShardingContext shardingContext) { public void execute(ShardingContext shardingContext) {
try (SqlSession sqlSession = SessionUtil.getFactory().openSession(true)) { try (SqlSession sqlSession = SessionUtil.getFactory().openSession(true)) {
if (shardingContext.getShardingItem() == 0) { AutoTurnoverJob autoTurnoverJob = new AutoTurnoverJob();
AutoTurnoverJob autoTurnoverJob = new AutoTurnoverJob(); DcBaseQueueMapper dcBaseQueueMapper = sqlSession.getMapper(DcBaseQueueMapper.class);
DcBaseQueueMapper dcBaseQueueMapper = sqlSession.getMapper(DcBaseQueueMapper.class); DcBaseStockMapper dcBaseStockMapper = sqlSession.getMapper(DcBaseStockMapper.class);
DcBaseStockMapper dcBaseStockMapper = sqlSession.getMapper(DcBaseStockMapper.class);
List<DcBaseQueue> dcBaseQueueList = dcBaseQueueMapper.selectByExample(DcBaseQueueExample.newAndCreateCriteria() List<DcBaseQueue> dcBaseQueueList = dcBaseQueueMapper.poll(shardingContext.getShardingItem());
.andConsumeDateIsNull()
.example()
.limit(1000));
dcBaseQueueList.forEach(dcBaseQueue -> { dcBaseQueueList.forEach(dcBaseQueue -> {
BaseQueueMessage baseQueueMessage = JSON.parseObject(dcBaseQueue.getMessage(), BaseQueueMessage.class); BaseQueueMessage baseQueueMessage = JSON.parseObject(dcBaseQueue.getMessage(), BaseQueueMessage.class);
DcBaseStock dcBaseStock = dcBaseStockMapper.selectOneByExample(DcBaseStockExample.newAndCreateCriteria() DcBaseStock dcBaseStock = dcBaseStockMapper.selectOneByExample(DcBaseStockExample.newAndCreateCriteria()
.andBailunSkuEqualTo(baseQueueMessage.getBailunSku()) .andBailunSkuEqualTo(baseQueueMessage.getBailunSku())
.andWarehouseCodeEqualTo(baseQueueMessage.getWarehouseCode()) .andWarehouseCodeEqualTo(baseQueueMessage.getWarehouseCode())
.example()); .example());
try {
autoTurnoverJob.autoTurnoverFromStock(DateTimeFormatter.ofPattern(CommonConstant.DATE_FORMAT).format(LocalDate.now()), dcBaseStock);
dcBaseQueueMapper.deleteByPrimaryKey(dcBaseQueue.getId());
} catch (Exception e) {
try { try {
autoTurnoverJob.autoTurnoverFromStock(DateTimeFormatter.ofPattern(CommonConstant.DATE_FORMAT).format(LocalDate.now()), dcBaseStock); dcBaseQueue.setQueueType(2);
dcBaseQueueMapper.deleteByPrimaryKey(dcBaseQueue.getId()); dcBaseQueue.setErrorMessage(e.getMessage());
} catch (Exception e) { dcBaseQueue.setErrorStackTrace(Arrays.toString(e.getStackTrace()));
try { dcBaseQueueMapper.updateByPrimaryKey(dcBaseQueue);
dcBaseQueue.setQueueType(2); } catch (Exception ex) {
dcBaseQueue.setErrorMessage(e.getMessage()); log.error(ex.getMessage());
dcBaseQueue.setErrorStackTrace(Arrays.toString(e.getStackTrace()));
dcBaseQueueMapper.updateByPrimaryKey(dcBaseQueue);
} catch (Exception ex) {
log.error(ex.getMessage());
}
} finally {
log.info("消费一条数据 message:{}", dcBaseQueue.getMessage());
} }
} finally {
log.info("消费一条数据 message:{}", dcBaseQueue.getMessage());
}
}); });
}
} }
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment