Commit 0c01ef59 by yinyong

资料库新增旧数据同步服务

parent 30d088cb
package com.bailuntec; package com.bailuntec;
import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.druid.pool.DruidDataSource;
import com.bailuntec.job.SkuCoroutineMSSyncJob;
import com.bailuntec.job.SkuMSSyncJob; import com.bailuntec.job.SkuMSSyncJob;
import com.bailuntec.job.SkuMappingSyncJob; import com.bailuntec.job.SkuMappingSyncJob;
import com.bailuntec.job.SkuSimpleCategoryJob; import com.bailuntec.job.SkuSimpleCategoryJob;
...@@ -28,6 +29,7 @@ public class Application { ...@@ -28,6 +29,7 @@ public class Application {
private static final String EVENT_RDB_STORAGE_PASSWORD = propertiesUtil.getPropertyAsString("EVENT_RDB_STORAGE_PASSWORD"); private static final String EVENT_RDB_STORAGE_PASSWORD = propertiesUtil.getPropertyAsString("EVENT_RDB_STORAGE_PASSWORD");
public static void main(String[] args) { public static void main(String[] args) {
new JobScheduler(createRegistryCenter(), createJobConfiguration()).init(); new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();
new JobScheduler(createRegistryCenter(), createJobConfiguration3()).init();
new JobScheduler(createRegistryCenter(), createJobConfiguration1()).init(); new JobScheduler(createRegistryCenter(), createJobConfiguration1()).init();
new JobScheduler(createRegistryCenter(), createJobConfiguration2()).init(); new JobScheduler(createRegistryCenter(), createJobConfiguration2()).init();
} }
...@@ -58,6 +60,13 @@ public class Application { ...@@ -58,6 +60,13 @@ public class Application {
return simpleJobRootConfig; return simpleJobRootConfig;
} }
private static LiteJobConfiguration createJobConfiguration3() {
JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder(propertiesUtil.getPropertyAsString("JOB_NAME_COROUTINE"), propertiesUtil.getPropertyAsString("JOB_CRON_COROUTINE"), propertiesUtil.getPropertyAsInt("SHARDING_TOTAL_COUNT")).build();
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, SkuCoroutineMSSyncJob.class.getCanonicalName());
LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();
return simpleJobRootConfig;
}
private static JobEventConfiguration createJobEventConfiguration() { private static JobEventConfiguration createJobEventConfiguration() {
JobEventConfiguration jobEventRdbConfig = new JobEventRdbConfiguration(setUpEventTraceDataSource()); JobEventConfiguration jobEventRdbConfig = new JobEventRdbConfiguration(setUpEventTraceDataSource());
return jobEventRdbConfig; return jobEventRdbConfig;
......
package com.bailuntec.job;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.bailuntec.domain.entity.DcBaseSku;
import com.bailuntec.domain.entity.DcBaseSkuWarehouse;
import com.bailuntec.domain.entity.JobPointLog;
import com.bailuntec.domain.example.DcBaseSkuExample;
import com.bailuntec.domain.example.DcBaseSkuWarehouseExample;
import com.bailuntec.domain.pojo.SkuInfo;
import com.bailuntec.domain.pojo.SkuMsResponse;
import com.bailuntec.domain.pojo.SkuMsResult;
import com.bailuntec.domain.pojo.WarehouseInfo;
import com.bailuntec.domain.request.SkuCondition;
import com.bailuntec.domain.request.SkumsRequest;
import com.bailuntec.mapper.DcBaseSkuMapper;
import com.bailuntec.mapper.DcBaseSkuWarehouseMapper;
import com.bailuntec.mapper.JobPointLogMapper;
import com.bailuntec.support.PointJob;
import com.bailuntec.utils.OkHttpUtil;
import com.bailuntec.utils.PropertiesUtil;
import com.bailuntec.utils.SessionUtil;
import com.dangdang.ddframe.job.api.ShardingContext;
import lombok.extern.slf4j.Slf4j;
import okhttp3.*;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.lang3.StringUtils;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.List;
@Slf4j
public class SkuCoroutineMSSyncJob extends PointJob {
private PropertiesUtil propertiesUtil = PropertiesUtil.getInstance("const");
private OkHttpClient client = OkHttpUtil.getInstance();
@Override
public void executeJob(ShardingContext shardingContext, JobPointLog jobPointLog) {
SkumsRequest skumsRequest = new SkumsRequest();
skumsRequest.setConditionJson(JSON.toJSONString(new SkuCondition(jobPointLog.getStartTime().minusMinutes(3), jobPointLog.getEndTime())));
skumsRequest.setPageNumber(jobPointLog.getPageSize());
do {
MediaType mediaType = MediaType.parse("application/json");
skumsRequest.setPageIndex(jobPointLog.getPageIndex() > 0?jobPointLog.getPageIndex() : 1);
log.warn(JSON.toJSONString(skumsRequest));
RequestBody body = RequestBody.create(mediaType, JSON.toJSONString(skumsRequest));
Request request = new Request.Builder()
.url(propertiesUtil.getPropertyAsString("SKU_URL"))
.post(body)
.addHeader("Content-Type", "application/json")
.build();
Response response = null;
String resultStr = null;
try {
response = client.newCall(request).execute();
resultStr = response.body().string();
} catch (IOException e) {
throw new RuntimeException("调用SKUMS系统接口失败", e);
} finally {
if (response != null) {
response.close();
}
}
if (StringUtils.isNotBlank(resultStr)) {
SkuMsResponse skuMsResponse = JSONObject.parseObject(resultStr, SkuMsResponse.class);
if (skuMsResponse.getStatusCode()!= null && skuMsResponse.getStatusCode().equals(200)) {
SkuMsResult result = skuMsResponse.getResult();
if (result != null) {
List<SkuInfo> data = result.getData();
if (data != null && data.size() > 0) {
handleSkuMsJson(data,jobPointLog);
}
if (jobPointLog.getPageIndex().equals(0) && result.getTotalPage() != null) {
jobPointLog.setPageIndex(result.getTotalPage() + 1);
}
}
} else {
throw new RuntimeException("调用SKUMS系统接口返回错误");
}
} else {
throw new RuntimeException("调用SKUMS系统接口返回null");
}
jobPointLog.setPageIndex(jobPointLog.getPageIndex() - 1);
} while (jobPointLog.getPageIndex() > 0);
jobPointLog.setPageIndex(0);
jobPointLog.setStartTime(jobPointLog.getEndTime());
jobPointLog.setEndTime(jobPointLog.getStartTime().plusDays(jobPointLog.getIntervalTime().longValue()).isAfter(LocalDateTime.now()) ? LocalDateTime.now() : jobPointLog.getStartTime().plusDays(jobPointLog.getIntervalTime().longValue()));
}
private void handleSkuMsJson(List<SkuInfo> data,JobPointLog jobPointLog) {
DcBaseSku dcBaseSku = new DcBaseSku();
DcBaseSkuWarehouse dcBaseSkuWarehouse = new DcBaseSkuWarehouse();
try {
DcBaseSkuMapper mapper = SessionUtil.getSession().getMapper(DcBaseSkuMapper.class);
DcBaseSkuWarehouseMapper dcBaseSkuWarehouseMapper = SessionUtil.getSession().getMapper(DcBaseSkuWarehouseMapper.class);
for (SkuInfo skuInfo : data) {
if (StringUtils.isNotBlank(skuInfo.getBailunSku())) {
//逻辑删除仓库
dcBaseSkuWarehouseMapper.logicDeleteWarehouse(skuInfo.getBailunSku(),skuInfo.getCompanyId());
try {
BeanUtils.copyProperties(dcBaseSku, skuInfo);
} catch (Exception e) {
throw new RuntimeException("BeanUtils.copyProperties失败");
}
List<WarehouseInfo> warehouseList = skuInfo.getWarehouseList();
if (warehouseList != null && warehouseList.size() > 0) {
for (WarehouseInfo warehouseInfo : warehouseList) {
try {
BeanUtils.copyProperties(dcBaseSkuWarehouse, warehouseInfo);
} catch (Exception e) {
throw new RuntimeException("BeanUtils.copyProperties失败");
}
dcBaseSkuWarehouse.setBailunSku(skuInfo.getBailunSku());
dcBaseSkuWarehouse.setCompanyId(skuInfo.getCompanyId());
dcBaseSkuWarehouse.setStatus(0);
dcBaseSkuWarehouse.setGmtModified(LocalDateTime.now());
int i = dcBaseSkuWarehouseMapper.updateByExampleSelective(dcBaseSkuWarehouse, DcBaseSkuWarehouseExample.newAndCreateCriteria().andBailunSkuEqualTo(dcBaseSkuWarehouse.getBailunSku()).andCompanyIdEqualTo(dcBaseSkuWarehouse.getCompanyId()).andWarehouseCodeEqualTo(dcBaseSkuWarehouse.getWarehouseCode()).example());
if (i == 0) {
dcBaseSkuWarehouseMapper.insertSelective(dcBaseSkuWarehouse);
}
}
}
dcBaseSku.setGmtModified(LocalDateTime.now());
int v = mapper.updateByExampleSelective(dcBaseSku, DcBaseSkuExample.newAndCreateCriteria().andBailunSkuEqualTo(dcBaseSku.getBailunSku()).example());
if (v == 0) {
mapper.insertSelective(dcBaseSku);
}
}
}
if (jobPointLog.getPageIndex() % 4 == 0) {
JobPointLogMapper jobPointLogMapper = SessionUtil.getSession().getMapper(JobPointLogMapper.class);
jobPointLogMapper.upsertSelective(jobPointLog);
}
} catch (RuntimeException e) {
throw new RuntimeException("MYBATIS操作SKU失败",e);
} finally {
SessionUtil.closeSession();
}
}
}
...@@ -8,9 +8,10 @@ EVENT_RDB_STORAGE_USERNAME=root ...@@ -8,9 +8,10 @@ EVENT_RDB_STORAGE_USERNAME=root
EVENT_RDB_STORAGE_PASSWORD=#7kfnymAM$Y9-Ntf EVENT_RDB_STORAGE_PASSWORD=#7kfnymAM$Y9-Ntf
ZOOKEEPER_SERVER=172.31.255.120:2181 ZOOKEEPER_SERVER=172.31.255.120:2181
NAME_SPACE=data-center NAME_SPACE=data-center
#JOB_NAME=base-sync-sku JOB_NAME=base-sync-sku
JOB_NAME=base-sync-sku-coroutine JOB_NAME_COROUTINE=base-sync-sku-coroutine
JOB_CRON=0/10 * * * * ? * JOB_CRON=0/10 * * * * ? *
JOB_CRON_COROUTINE=0/1 * * * * ? *
JOB_SIMPLE_CATEGORY_NAME=base-sync-sku-simple-category JOB_SIMPLE_CATEGORY_NAME=base-sync-sku-simple-category
JOB_SIMPLE_CATEGORY_CRON=0 0 23 * * ? * JOB_SIMPLE_CATEGORY_CRON=0 0 23 * * ? *
JOB_MAPPING_NAME=base-sync-sku-mapping JOB_MAPPING_NAME=base-sync-sku-mapping
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment