Commit 224c00fd by wutong

OMS,SKUMS, 调拨数据分页查询修改, 避免漏数据.

parent 2c4348c4
......@@ -35,81 +35,62 @@ public class OrderSyncJob extends PointJob {
@Override
public void executeJob(ShardingContext shardingContext, JobPointLog jobPointLog) {
//如果页号为0, 就表示上次查完了, 这次要从总页数开始查, 如果页号 > 0, 就降序查, 防止漏单
if (jobPointLog.getPageIndex() == 0) {
jobPointLog.setPageIndex(requestBailunOrder(jobPointLog));
}
if (jobPointLog.getPageIndex() > 0) {
do {
requestBailunOrder(jobPointLog);
jobPointLog.setPageIndex(jobPointLog.getPageIndex() - 1);
if (jobPointLog.getPageIndex() % 10 == 0) {
try {
JobPointLogMapper mapper = SessionUtil.getSession().getMapper(JobPointLogMapper.class);
mapper.upsertSelective(jobPointLog);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("Mybatis操作DB失败", e);
} finally {
SessionUtil.closeSession();
}
}
} while (0 < jobPointLog.getPageIndex());
}
if (jobPointLog.getPageIndex() == 0) {
jobPointLog.setStartTime(jobPointLog.getEndTime());
jobPointLog.setEndTime(jobPointLog.getEndTime().plusDays(jobPointLog.getIntervalTime()).isAfter(LocalDateTime.now()) ? LocalDateTime.now() : jobPointLog.getEndTime().plusDays(jobPointLog.getIntervalTime()));
}
}
/**
* 请求OMS接口
*
* @param jobPointLog
* @return
*/
private Integer requestBailunOrder(JobPointLog jobPointLog) {
Integer totalPages = null;
LinkedHashMap<String, String> map = new LinkedHashMap<>(4);
map.put("pageIndex", jobPointLog.getPageIndex() == 0 ? "1" : jobPointLog.getPageIndex().toString());
map.put("pageCount", jobPointLog.getPageSize().toString());
//时间回退一点, 避免服务器时间不一致而漏单
map.put("BailunLastUpdateTimeFrom", DateTimeFormatter.ofPattern(CommonConstant.TIME_FORMAT).format(jobPointLog.getStartTime().minusMinutes(5)));
map.put("BailunLastUpdateTimeFrom", DateTimeFormatter.ofPattern(CommonConstant.TIME_FORMAT).format(jobPointLog.getStartTime().minusMinutes(3)));
map.put("BailunLastUpdateTimeTo", DateTimeFormatter.ofPattern(CommonConstant.TIME_FORMAT).format(jobPointLog.getEndTime()));
Response response = null;
String omsResultStr = null;
try {
Request request = new Request.Builder()
.get()
.url(OkHttpUtil.attachHttpGetParams(propertiesUtil.getPropertyAsString("BAILUNORDER_URL"), map))
.addHeader("Content-Type", "application/json")
.build();
response = okHttpClient.newCall(request).execute();
omsResultStr = response.body().string();
} catch (IOException e) {
throw new RuntimeException(map + "请求OMS接口同步百伦接口失败" + response, e);
} finally {
if (response != null) {
response.close();
do {
map.put("pageIndex", jobPointLog.getPageIndex().equals(0) ? "1" : jobPointLog.getPageIndex().toString());
Response response = null;
String omsResultStr = null;
try {
Request request = new Request.Builder()
.get()
.url(OkHttpUtil.attachHttpGetParams(propertiesUtil.getPropertyAsString("BAILUNORDER_URL"), map))
.addHeader("Content-Type", "application/json")
.build();
response = okHttpClient.newCall(request).execute();
omsResultStr = response.body().string();
} catch (IOException e) {
throw new RuntimeException(map + "请求OMS接口同步百伦接口失败" + response, e);
} finally {
if (response != null) {
response.close();
}
}
}
if (StringUtils.isNoneBlank(omsResultStr)) {
OmsResultRoot omsResultRoot = JSON.parseObject(omsResultStr, OmsResultRoot.class);
if (omsResultRoot != null && omsResultRoot.getSuccess().booleanValue()) {
OmsResultInfo omsResultInfo = omsResultRoot.getResult();
totalPages = omsResultInfo.getTotalPages();
if (omsResultInfo.getResult() != null && omsResultInfo.getResult().size() > 0) {
analyseOmsOrder(omsResultInfo.getResult());
if (StringUtils.isNoneBlank(omsResultStr)) {
OmsResultRoot omsResultRoot = JSON.parseObject(omsResultStr, OmsResultRoot.class);
if (omsResultRoot != null && omsResultRoot.getSuccess().booleanValue()) {
OmsResultInfo omsResultInfo = omsResultRoot.getResult();
if (jobPointLog.getPageIndex().equals(0)) {
jobPointLog.setPageIndex(omsResultInfo.getTotalPages() + 1);
}
if (omsResultInfo.getResult() != null && omsResultInfo.getResult().size() > 0) {
analyseOmsOrder(omsResultInfo.getResult());
}
} else {
throw new RuntimeException("调用OMS接口同步百伦订单失败, 响应200, 请求参数" + map.toString());
}
} else {
throw new RuntimeException("调用OMS接口同步百伦订单失败, 响应200, 请求参数" + map.toString());
throw new RuntimeException("调用OMS接口同步百伦订单失败, 响应为null, 请求参数" + map.toString());
}
} else {
throw new RuntimeException("调用OMS接口同步百伦订单失败, 响应为null, 请求参数" + map.toString());
}
//如果PageIndex等于0取总页数, 否则取PageIndex
return jobPointLog.getPageIndex() == 0 ? totalPages : jobPointLog.getPageIndex();
if (jobPointLog.getPageIndex() % 10 == 0) {
try {
JobPointLogMapper mapper = SessionUtil.getSession().getMapper(JobPointLogMapper.class);
mapper.upsertSelective(jobPointLog);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("Mybatis操作DB插入任务记录失败", e);
} finally {
SessionUtil.closeSession();
}
}
jobPointLog.setPageIndex(jobPointLog.getPageIndex() - 1);
} while (0 < jobPointLog.getPageIndex());
jobPointLog.setPageIndex(0);
jobPointLog.setStartTime(jobPointLog.getEndTime());
jobPointLog.setEndTime(jobPointLog.getEndTime().plusDays(jobPointLog.getIntervalTime()).isAfter(LocalDateTime.now()) ? LocalDateTime.now() : jobPointLog.getEndTime().plusDays(jobPointLog.getIntervalTime()));
}
/**
......
......@@ -9,7 +9,7 @@ EVENT_RDB_STORAGE_USERNAME=root
EVENT_RDB_STORAGE_PASSWORD=#7kfnymAM$Y9-Ntf
ZOOKEEPER_SERVER=172.31.255.120:2181
NAME_SPACE=data-center
JOB_NAME=base-sync-oms-order-coroutine
#JOB_NAME=base-sync-oms-order
#JOB_NAME=base-sync-oms-order-coroutine
JOB_NAME=base-sync-oms-order
JOB_CRON=0/1 * * * * ? *
SHARDING_TOTAL_COUNT=1
\ No newline at end of file
......@@ -68,7 +68,7 @@ public class SkuMSSyncJob extends PointJob {
handleSkuMsJson(data,jobPointLog);
}
if (jobPointLog.getPageIndex().equals(0) && result.getTotalPage() != null) {
jobPointLog.setPageIndex(result.getTotalPage());
jobPointLog.setPageIndex(result.getTotalPage() + 1);
}
}
} else {
......
......@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.bailuntec.domain.constant.CommonConstant;
import com.bailuntec.domain.entity.*;
import com.bailuntec.domain.example.*;
import com.bailuntec.domain.pojo.TransferDetailsPOJO;
import com.bailuntec.domain.pojo.TransferStreamPOJO;
import com.bailuntec.mapper.*;
......@@ -31,10 +32,8 @@ public class TransferDetailsServiceImpl {
map.put("EndDate", DateTimeFormatter.ofPattern(CommonConstant.TIME_FORMAT).format(jobPointLog.getEndTime()));
map.put("State", jobPointLog.getType());
map.put("PageRow", jobPointLog.getPageSize());
Integer page = 1;
Integer pagetotal = 0;
do {
map.put("CurrentPage", page);
map.put("CurrentPage", jobPointLog.getPageIndex() > 0?jobPointLog.getPageIndex() : 1);
Response response = null;
String responseStr = null;
try {
......@@ -57,6 +56,9 @@ public class TransferDetailsServiceImpl {
JSONObject jsonObject = JSON.parseObject(responseStr);
if (jsonObject.get("isSuccess") != null && jsonObject.getBooleanValue("isSuccess")) {
TransferDetailsPOJO transferDetailsPOJO = jsonObject.getObject("data", TransferDetailsPOJO.class);
if (jobPointLog.getPageIndex().equals(0)) {
jobPointLog.setPageIndex(transferDetailsPOJO.getPageCount() + 1);
}
if (transferDetailsPOJO != null && transferDetailsPOJO.getData() != null && transferDetailsPOJO.getData().size() > 0) {
switch (jobPointLog.getType()) {
case 1:
......@@ -69,8 +71,11 @@ public class TransferDetailsServiceImpl {
}
try {
DcBaseTransferBaleMapper mapper1 = SessionUtil.getSession().getMapper(DcBaseTransferBaleMapper.class);
mapper1.upsertSelective(dcBaseTransferBale);
SessionUtil.getSession().commit();
int i = mapper1.updateByExampleSelective(dcBaseTransferBale, DcBaseTransferBaleExample.newAndCreateCriteria().andBailunSkuEqualTo(dcBaseTransferBale.getBailunSku())
.andTransferOrderIdEqualTo(dcBaseTransferBale.getTransferOrderId()).example());
if (i == 0) {
mapper1.insertSelective(dcBaseTransferBale);
}
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("MYbatis操作DB失败", e);
......@@ -89,8 +94,11 @@ public class TransferDetailsServiceImpl {
}
try {
DcBaseTransferDeliveryMapper mapper2 = SessionUtil.getSession().getMapper(DcBaseTransferDeliveryMapper.class);
mapper2.upsertSelective(dcBaseTransferDelivery);
SessionUtil.getSession().commit();
int i = mapper2.updateByExampleSelective(dcBaseTransferDelivery, DcBaseTransferDeliveryExample.newAndCreateCriteria().andBailunSkuEqualTo(dcBaseTransferDelivery.getBailunSku())
.andTransferOrderIdEqualTo(dcBaseTransferDelivery.getTransferOrderId()).example());
if (i == 0) {
mapper2.insertSelective(dcBaseTransferDelivery);
}
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("MYbatis操作DB失败", e);
......@@ -110,8 +118,11 @@ public class TransferDetailsServiceImpl {
}
try {
DcBaseTransferVerifyMapper mapper3 = SessionUtil.getSession().getMapper(DcBaseTransferVerifyMapper.class);
mapper3.upsertSelective(dcBaseTransferVerify);
SessionUtil.getSession().commit();
int i = mapper3.updateByExampleSelective(dcBaseTransferVerify, DcBaseTransferVerifyExample.newAndCreateCriteria().andBailunSkuEqualTo(dcBaseTransferVerify.getBailunSku())
.andTransferOrderIdEqualTo(dcBaseTransferVerify.getTransferOrderId()).example());
if (i == 0) {
mapper3.insertSelective(dcBaseTransferVerify);
}
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("MYbatis操作DB失败", e);
......@@ -130,8 +141,11 @@ public class TransferDetailsServiceImpl {
}
try {
DcBaseTransferTransitMapper mapper4 = SessionUtil.getSession().getMapper(DcBaseTransferTransitMapper.class);
mapper4.upsertSelective(dcBaseTransferTransit);
SessionUtil.getSession().commit();
int i = mapper4.updateByExampleSelective(dcBaseTransferTransit, DcBaseTransferTransitExample.newAndCreateCriteria().andBailunSkuEqualTo(dcBaseTransferTransit.getBailunSku())
.andTransferOrderIdEqualTo(dcBaseTransferTransit.getTransferOrderId()).example());
if (i == 0) {
mapper4.insertSelective(dcBaseTransferTransit);
}
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("MYbatis操作DB失败", e);
......@@ -150,8 +164,11 @@ public class TransferDetailsServiceImpl {
}
try {
DcBaseTransferCompleteMapper mapper5 = SessionUtil.getSession().getMapper(DcBaseTransferCompleteMapper.class);
mapper5.upsertSelective(dcBaseTransferComplete);
SessionUtil.getSession().commit();
int i = mapper5.updateByExampleSelective(dcBaseTransferComplete, DcBaseTransferCompleteExample.newAndCreateCriteria().andBailunSkuEqualTo(dcBaseTransferComplete.getBailunSku())
.andTransferOrderIdEqualTo(dcBaseTransferComplete.getTransferOrderId()).example());
if (i == 0) {
mapper5.insertSelective(dcBaseTransferComplete);
}
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("MYbatis操作DB失败", e);
......@@ -170,8 +187,11 @@ public class TransferDetailsServiceImpl {
}
try {
DcBaseTransferInboundMapper mapper12 = SessionUtil.getSession().getMapper(DcBaseTransferInboundMapper.class);
mapper12.upsertSelective(dcBaseTransferInbound);
SessionUtil.getSession().commit();
int i = mapper12.updateByExampleSelective(dcBaseTransferInbound, DcBaseTransferInboundExample.newAndCreateCriteria().andBailunSkuEqualTo(dcBaseTransferInbound.getBailunSku())
.andTransferOrderIdEqualTo(dcBaseTransferInbound.getTransferOrderId()).example());
if (i == 0) {
mapper12.insertSelective(dcBaseTransferInbound);
}
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("MYbatis操作DB失败", e);
......@@ -182,14 +202,26 @@ public class TransferDetailsServiceImpl {
break;
}
}
jobPointLog.setStartTime(jobPointLog.getEndTime());
jobPointLog.setEndTime(jobPointLog.getStartTime().plusDays(jobPointLog.getIntervalTime().longValue()).isAfter(LocalDateTime.now()) ? LocalDateTime.now() : jobPointLog.getStartTime().plusDays(jobPointLog.getIntervalTime().longValue()));
} else {
throw new RuntimeException("调用调拨流水接口返回200但是失败");
}
} else {
throw new RuntimeException("调用调拨流水接口失败");
}
page++;
} while (page <= pagetotal);
if (jobPointLog.getPageIndex() % 5 == 0) {
try {
JobPointLogMapper mapper = SessionUtil.getSession().getMapper(JobPointLogMapper.class);
mapper.upsertSelective(jobPointLog);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("Mybatis操作DB插入任务记录失败", e);
} finally {
SessionUtil.closeSession();
}
}
jobPointLog.setPageIndex(jobPointLog.getPageIndex() - 1);
} while (jobPointLog.getPageIndex() > 0);
jobPointLog.setPageIndex(0);
jobPointLog.setStartTime(jobPointLog.getEndTime());
jobPointLog.setEndTime(jobPointLog.getStartTime().plusDays(jobPointLog.getIntervalTime().longValue()).isAfter(LocalDateTime.now()) ? LocalDateTime.now() : jobPointLog.getStartTime().plusDays(jobPointLog.getIntervalTime().longValue()));
}
......
......@@ -8,9 +8,9 @@ public class TransferDetailsTest {
@Test
public void test() {
JobPointLog jobPointLog = new JobPointLog();
jobPointLog.setStartTime(LocalDateTime.of(2019,02,12,17,13,18));
jobPointLog.setEndTime(LocalDateTime.of(2019,02,13,17,13,19));
jobPointLog.setType(3);
jobPointLog.setStartTime(LocalDateTime.of(2019,04,17,14,53,14));
jobPointLog.setEndTime(LocalDateTime.of(2019,04,17,14,53,15));
jobPointLog.setType(5);
jobPointLog.setIntervalTime(1);
jobPointLog.setPageIndex(1);
jobPointLog.setPageSize(1000);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment