Commit 150e144d by huluobin

# 更新

parent cb741b84
...@@ -2,9 +2,11 @@ package com.bailuntec.mapper; ...@@ -2,9 +2,11 @@ package com.bailuntec.mapper;
import com.bailuntec.domain.entity.DcBaseQueue; import com.bailuntec.domain.entity.DcBaseQueue;
import com.bailuntec.domain.example.DcBaseQueueExample; import com.bailuntec.domain.example.DcBaseQueueExample;
import java.util.List; import com.dangdang.ddframe.job.api.ShardingContext;
import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Param;
import java.util.List;
public interface DcBaseQueueMapper { public interface DcBaseQueueMapper {
/** /**
* This method was generated by MyBatis Generator. * This method was generated by MyBatis Generator.
...@@ -164,5 +166,5 @@ public interface DcBaseQueueMapper { ...@@ -164,5 +166,5 @@ public interface DcBaseQueueMapper {
int upsertWithBLOBs(DcBaseQueue record); int upsertWithBLOBs(DcBaseQueue record);
List<DcBaseQueue> poll(int shardingItem); List<DcBaseQueue> poll(@Param("shardingContext") ShardingContext shardingContext);
} }
...@@ -570,7 +570,8 @@ ...@@ -570,7 +570,8 @@
select * select *
from dc_base_queue from dc_base_queue
where consume_date is null where consume_date is null
and id % 8 = #{shardingItem} and id % #{shardingContext.shardingTotalCount} = #{shardingContext.shardingItem}
limit 1000 limit 1000
</select> </select>
</mapper> </mapper>
...@@ -48,7 +48,7 @@ public class QueueConsumerJob implements SimpleJob { ...@@ -48,7 +48,7 @@ public class QueueConsumerJob implements SimpleJob {
DcBaseQueueMapper dcBaseQueueMapper = sqlSession.getMapper(DcBaseQueueMapper.class); DcBaseQueueMapper dcBaseQueueMapper = sqlSession.getMapper(DcBaseQueueMapper.class);
DcBaseStockMapper dcBaseStockMapper = sqlSession.getMapper(DcBaseStockMapper.class); DcBaseStockMapper dcBaseStockMapper = sqlSession.getMapper(DcBaseStockMapper.class);
List<DcBaseQueue> dcBaseQueueList = dcBaseQueueMapper.poll(shardingContext.getShardingItem()); List<DcBaseQueue> dcBaseQueueList = dcBaseQueueMapper.poll(shardingContext);
dcBaseQueueList.parallelStream().forEach(dcBaseQueue -> { dcBaseQueueList.parallelStream().forEach(dcBaseQueue -> {
BaseQueueMessage baseQueueMessage = JSON.parseObject(dcBaseQueue.getMessage(), BaseQueueMessage.class); BaseQueueMessage baseQueueMessage = JSON.parseObject(dcBaseQueue.getMessage(), BaseQueueMessage.class);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment