资源条件有限,需要支持智搜的数据量也不大,上es搜索有点大材小用了,只好写个简版mysql的智搜,处理全文搜素,支持拼音搜索,中文分词,自定义分词断词,地图范围搜索,周边搜索,自定义多边形搜索。
通过设置定时SpringBatch抽取需要进行检索的表,并将检索的标题,概要,内容等存入检索表中。通过检索表进行全文检索。
代码主要包:通过网盘分享的文件:mysql智搜.zip 链接: https://pan.baidu.com/s/1MMhmyVD8o56Grp1Aa5IiKQ 提取码: 0530
1、提供的接口
@RestController
@Tag(name = "智搜全局搜索API")
@RequestMapping(path = "/smart/serach")
public class SearchIndexController {@Resource
private SearchIndexService searchIndexService;@PostMapping("/smartSearch")
@Operation(summary = "智搜数据查询")
@Log(title = "一张图智搜数据查询", businessType = BusinessType.QUERY)
public Response> getSearchIndexDataByPage (@RequestBody SearchIndexParamDTO searchIndexParam) {
return Response.with(searchIndexService.getSearchIndexDataByPage(searchIndexParam));
}@PostMapping("/getSearchIndexByPage")
@Operation(summary = "普通智搜内容分页查询")
@Log(title = "普通智搜内容分页查询", businessType = BusinessType.QUERY)
public Response> getSearchIndexByPage (@RequestBody SearchIndexParamDTO searchIndexParam) {
return Response.with(searchIndexService.getSearchIndexByPage(searchIndexParam));
}@PostMapping("/getSearchIndexById")
@Operation(summary = "通过主键获取检索详细信息")
@Log(title = "普通智搜内容分页查询", businessType = BusinessType.QUERY)
public Response getSearchIndexById (@RequestBody SearchIndexParamDTO searchIndexParam) {
return Response.with(searchIndexService.getSearchIndexById(searchIndexParam));
}@PostMapping("/smartSearchRiver")
@Operation(summary = "智搜溯源河流数据查询")
@Log(title = "一张图智搜溯源河流数据查询", businessType = BusinessType.QUERY)
public Response getSmartSearchRiver (@RequestBody SearchPointParamDTO searchIndexParam) {
return Response.with(searchIndexService.getSmartSearchRiver(searchIndexParam));
}}
2、service
public interface SearchIndexService extends IService {
Page getSearchIndexByPage (SearchIndexParamDTO searchIndexParamDTO) ;
SearchIndexDTO getSearchIndexById (SearchIndexParamDTO searchIndexParamDTO) ;Page getSearchIndexDataByPage (SearchIndexParamDTO searchIndexParamDTO) ;
String getSmartSearchRiver (SearchPointParamDTO searchIndexParam) ;}
@Service
@Slf4j
@RequiredArgsConstructor
@DS("master")
public class SearchIndexServiceImpl extends ServiceImpl implements SearchIndexService {@Resource
private SearchIndexMapper searchIndexMapper;@Resource
private DataSourceService dataSourceService;@Resource
private DynamicDataSourcesService dynamicDataSourcesService;@Override
public Page getSearchIndexByPage (SearchIndexParamDTO searchIndexParamDTO) {
if (searchIndexParamDTO.getSearchContent() != null ) {
boolean isPingyin = KeyWordUtils.isContainsPinyin(searchIndexParamDTO.getSearchContent());
if (isPingyin) {
searchIndexParamDTO.setSearchContent(KeyWordUtils.getPinyin(searchIndexParamDTO.getSearchContent(), CommonConstant.SPACE));
} else {
searchIndexParamDTO.setSearchContent(HanLpAdvancedUtil.extractKeywords(searchIndexParamDTO.getSearchContent()));
}
}
if (ObjectUtil.isNotEmpty(searchIndexParamDTO.getGeoJson())) {
List> polygonCoordinates = GeoJsonUtil.extractPolygonCoordinates(searchIndexParamDTO.getGeoJson());
String polygonWKT = GeoJsonUtil.convertGeoJsonToWKT(polygonCoordinates);
searchIndexParamDTO.setPolygonWKT(polygonWKT);
}
return searchIndexMapper.getSearchIndexByPage(searchIndexParamDTO.build(), searchIndexParamDTO);
}@Override
public SearchIndexDTO getSearchIndexById (SearchIndexParamDTO searchIndexParamDTO) {
if (StrUtil.isEmpty(searchIndexParamDTO.getPkId())) {
throw new CoreException (ErrorCodeEnum.SYS_ERROR, "查询参数缺失" );
}
SearchIndex searchIndex = searchIndexMapper.selectById(searchIndexParamDTO.getPkId());
String sql = "select * from " + searchIndex.getEntityTable() + " where " + searchIndex.getEntityKeyColumn() + " = ?" ;
DataSourceDTO dataSourceDTO = dataSourceService.findById(searchIndex.getEntitySourceId());
List columnList = dynamicDataSourcesService.getDbColumnList(dataSourceDTO, searchIndex.getEntityTable());
List jdbcParamValues = new ArrayList <>();
jdbcParamValues.add(searchIndex.getEntityId());
List> listInfo = dynamicDataSourcesService.executeListSql(dataSourceDTO, sql, jdbcParamValues);
SearchIndexDTO searchIndexDTO = new SearchIndexDTO ();
BeanUtilCopy.copyProperties(searchIndex, searchIndexDTO);
searchIndexDTO.setColumnList(columnList);
searchIndexDTO.setDetailInfo(CollUtil.isNotEmpty(listInfo)? listInfo.get(0 ) : null );
return searchIndexDTO;
}@Override
public Page getSearchIndexDataByPage (SearchIndexParamDTO searchIndexParamDTO) {
if (searchIndexParamDTO.getSearchContent() != null ) {
boolean isPingyin = KeyWordUtils.isContainsPinyin(searchIndexParamDTO.getSearchContent());
if (isPingyin) {
searchIndexParamDTO.setSearchContent(KeyWordUtils.getPinyin(HanLpAdvancedUtil.extractKeywords(searchIndexParamDTO.getSearchContent()), CommonConstant.SPACE));
} else {
searchIndexParamDTO.setSearchContent(HanLpAdvancedUtil.segmentToString(searchIndexParamDTO.getSearchContent(), Boolean.TRUE));
}
}
if (ObjectUtil.isNotEmpty(searchIndexParamDTO.getGeoJson())) {
List> polygonCoordinates = GeoJsonUtil.extractPolygonCoordinates(searchIndexParamDTO.getGeoJson());
String polygonWKT = GeoJsonUtil.convertGeoJsonToWKT(polygonCoordinates);
searchIndexParamDTO.setPolygonWKT(polygonWKT);
}
if (CollUtil.isEmpty(searchIndexParamDTO.getEntityTypes())) {
List entityTypes = Arrays.asList("3" ,"4" ,"5" ,"6" ,"7" );
searchIndexParamDTO.setEntityTypes(entityTypes);
}
return searchIndexMapper.getSearchIndexByPage(searchIndexParamDTO.build(), searchIndexParamDTO);
}
@Override
public String getSmartSearchRiver (SearchPointParamDTO searchIndexParam) {
if (searchIndexParam.getMaxDistanceKm() == null ) {
searchIndexParam.setMaxDistanceKm(5D );
}
return GeoJsonUtil.extractLineSubset(searchIndexParam.getLongitudeStart(), searchIndexParam.getLatitudeStart(), searchIndexParam.getLongitudeEnd(), searchIndexParam.getLatitudeEnd(), searchIndexParam.getMaxDistanceKm());
}
}
3、数据库表设计
数据库表
DROP TABLE IF EXISTS `ads_search_index_table`;
CREATE TABLE `ads_search_index_table` (
`pk_id` int NOT NULL AUTO_INCREMENT COMMENT '主键' ,
`entity_type` varchar(50 ) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '分类' ,
`entity_type_name` varchar(50 ) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '分类名称' ,
`entity_key_column` varchar(50 ) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '主键字段' ,
`entity_table` varchar(100 ) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '实体对应的表名' ,
`entity_table_name` varchar(100 ) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '实体对应的表名中文' ,
`entity_source_id` varchar(32 ) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '实体所在数据源,目前只支持当前库' ,
`title_column` varchar(100 ) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '标题抽入字段,逗号分割' ,
`sumary_column` varchar(100 ) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '摘要信息字段,逗号分割' ,
`content_column` varchar(200 ) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '内容content需要抽入的字段英文逗号分割' ,
`keywords_default` varchar(50 ) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '关键词默认值' ,
`keywords_column` varchar(200 ) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '关键词keywords要抽入的字段英文逗号分割' ,
`weight_default` int NULL DEFAULT NULL COMMENT '默认权重' ,
`update_flag` varchar(10 ) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '1' COMMENT '是否更新1是0否' ,
`lng_column` varchar(100 ) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '经度字段' ,
`lat_column` varchar(100 ) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '纬度字段' ,
`delete_column` varchar(20 ) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '逻辑删除字段' ,
`region_code_column` varchar(50 ) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '行政区划编码' ,
`file_url_column` varchar(50 ) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '文件地址字段' ,
`deploy_time_column` varchar(50 ) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '数据发布时间字段' ,
`source_from` varchar(100 ) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '数据来源' ,
`revision` int NULL DEFAULT 0 COMMENT '乐观锁' ,
`created_by` varchar(32 ) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '创建人' ,
`created_time` datetime NULL DEFAULT NULL COMMENT '创建时间' ,
`updated_by` varchar(32 ) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '更新人' ,
`updated_time` datetime NULL DEFAULT NULL COMMENT '更新时间' ,
`deleted` int NULL DEFAULT 0 COMMENT '删除标志:0-未删除;1-已删除' ,
PRIMARY KEY (`pk_id`) USING BTREE,
FULLTEXT INDEX `ft_search`(`title_column`, `keywords_column`) WITH PARSER `ngram`
) ENGINE = InnoDB AUTO_INCREMENT = 14 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '全局搜索表数据来源信息' ROW_FORMAT = Dynamic;-- ----------------------------
-- Records of ads_search_index_table
-- ----------------------------
INSERT INTO `ads_search_index_table` VALUES (8 ,'8' ,'新闻' ,'pk_id' ,'test_environmental_news' ,'新闻表' ,'1' ,'news_title' ,'news_sumary' ,'news_content' ,'新闻' ,'news_title,news_content' ,80 ,'0' ,'' ,'' ,'' ,'region_code' ,'attachment_url' ,'publish_time' ,'某某单位' ,19 , NULL, NULL, 'xxxx@qq.com' ,'2025-05-26 16:22:58' ,0 );
INSERT INTO `ads_search_index_table` VALUES (9 ,'9' ,'数据' ,'PK_ID' ,'ads_t_app_interface' ,'某接口表' ,'1' ,'interface_name' ,'interface_name' ,'interface_url' ,'某接口' ,'interface_name' ,60 ,'0' , NULL, NULL, '' , NULL, NULL, 'created_time' ,'某信息中心' ,5 , NULL, NULL, 'xxxx@qq.com' ,'2025-05-20 17:08:28' ,0 );
INSERT INTO `ads_search_index_table` VALUES (10 ,'10' ,'文件' ,'pk_id' ,'test_environmental_news' ,'新闻表' ,'1' ,'news_title' ,'news_sumary' ,'news_content' ,'文件' ,'news_title,news_content' ,60 ,'0' ,'' ,'' ,'' ,'region_code' ,'attachment_url' ,'publish_time' ,'某某单位' ,6 , NULL, NULL, 'xxxx@qq.com' ,'2025-05-26 16:22:58' ,0 );
INSERT INTO `ads_search_index_table` VALUES (11 ,'11' ,'地图' ,'id' ,'test_dust_emission_source' ,'扬尘源' ,'1' ,'project_name' ,'project_name' ,'project_name' ,'地图' ,'project_name' ,60 ,'0' ,'longitude' ,'latitude' ,'' ,'region_code' ,'' ,'create_time' ,'某某单位' ,5 , NULL, NULL, 'xxx@qq.com' ,'2025-05-20 17:08:28' ,0 );
INSERT INTO `ads_search_index_table` VALUES (12 ,'12' ,'应用' ,'app_id' ,'ads_t_sys_app' ,'应用' ,'1' ,'name' ,'name' ,'description' ,'应用' ,'name' ,60 ,'0' ,'' ,'' ,'' ,'' ,'' ,'created_time' ,'某某单位' ,7 , NULL, NULL, 'xxxx@qq.com' ,'2025-05-26 16:22:58' ,0 );
INSERT INTO `ads_search_index_table` VALUES (13 ,'13' ,'公告' ,'pk_id' ,'test_environmental_announcement' ,'公告表' ,'1' ,'title' ,'summary' ,'content' ,'公告' ,'title,content' ,60 ,'0' ,'' ,'' ,'' ,'region_code' ,'attachment_url' ,'publish_time' ,'某某单位' ,5 , NULL, NULL, 'xxxx@qq.com' ,'2025-05-26 16:22:58' ,0 );DROP TABLE IF EXISTS `ads_search_index`;
CREATE TABLE `ads_search_index` (
`pk_id` varchar(32 ) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '主键' ,
`entity_type` varchar(50 ) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '分类' ,
`entity_type_name` varchar(50 ) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '分类名称' ,
`entity_key_column` varchar(50 ) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '主键字段' ,
`entity_id` varchar(100 ) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '对应实体表的主键' ,
`entity_table` varchar(100 ) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '实体对应的表名' ,
`entity_table_name` varchar(100 ) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '实体对应的表中文名' ,
`entity_source_id` varchar(32 ) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '实体所在数据源' ,
`title` varchar(300 ) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '标题' ,
`sumary` varchar(500 ) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '摘要' ,
`content` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL COMMENT '内容' ,
`keywords` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL COMMENT '关键词或标签' ,
`pingyin_keywords` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL COMMENT '拼音分词' ,
`weight` int NULL DEFAULT NULL COMMENT '权重或排序' ,
`lng` varchar(100 ) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '经度' ,
`lat` varchar(100 ) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '纬度' ,
`region_code` varchar(50 ) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '行政区划编码' ,
`region_name` varchar(50 ) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '行政区划名称' ,
`file_url` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL COMMENT '文件地址逗号分割' ,
`deploy_time` datetime NULL DEFAULT NULL COMMENT '数据来源发布时间' ,
`source_from` varchar(50 ) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '数据来源' ,
`revision` int NULL DEFAULT 0 COMMENT '乐观锁' ,
`created_by` varchar(32 ) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '创建人' ,
`created_time` datetime NULL DEFAULT NULL COMMENT '创建时间' ,
`updated_by` varchar(32 ) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '更新人' ,
`updated_time` datetime NULL DEFAULT NULL COMMENT '更新时间' ,
`deleted` int NULL DEFAULT 0 COMMENT '删除标志:0-未删除;1-已删除' ,
`location_point` point NULL,
PRIMARY KEY (`pk_id`) USING BTREE,
INDEX `idx_search_index`(`entity_type`, `entity_table`, `deleted`) USING BTREE,
FULLTEXT INDEX `ft_search`(`title`, `content`, `keywords`, `pingyin_keywords`)
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '全局搜索表' ROW_FORMAT = DYNAMIC;
4、SpringBatch抽取数据
依赖
org.springframework.batch
spring-batch-core
4.3 .10
com.belerweb
pinyin4j
2.5 .1
org.geotools
gt-shapefile
${geotools.version}
org.geotools
gt-main
org.geotools
gt-main
${geotools.version}
org.geotools
gt-geojson
${geotools.version}
org.geotools
gt-swing
${geotools.version}
org.geotools
gt-epsg-hsql
${geotools.version}
org.locationtech.jts
jts-core
1.18 .1
io.projectreactor.netty
reactor-netty-http
1.0 .39
com.janeluo
ikanalyzer
2012_u6
com.hankcs
hanlp
portable-1.8 .4
分词hanlp.properties配置
# HanLP 根路径
#root=D:/tools/hanlp/
root=/data/hanlp/# 核心词典
CoreDictionaryPath=data/dictionary/CoreNatureDictionary.txt# 自定义词典路径
CustomDictionaryPath=data/dictionary/custom/CustomDictionary.txt;data/dictionary/custom/hanyucidianDic.txt;data/dictionary/custom/siteDic.txt# 断词词典路径
CoreStopWordDictionaryPath=data/dictionary/stopwords.txt# 分词线程数据
SegmentThreadNumber=4 # 是否显示词性标注信息
ShowTermNature=false # 日志级别
HanLPLogLevel=WARN
SpringBatch配置
application.yxmlSpring
batch:
job:
enabled: false #启动时不启动job
jdbc:
initialize-schema: always
#是否定时任务执行
scheduler:
enabled: true
sql:
init:
schema-locations: classpath:/org/springframework/batch/core/schema-mysql.sql
@Component
public class ScheduledBatchTask {
@Resource
@Qualifier("searchIndexCreateTaskJob")
private Job searchIndexCreateTaskJob;@Resource
JobLauncher jobLauncher;@Value("${spring.batch.scheduler.enabled:false}")
private boolean schedulerEnabled;
@Scheduled(cron = "0 0 3 * * ?")
public void searchIndexCreateTask () throws Exception {
if (schedulerEnabled) {
JobParameters jobParameters = new JobParametersBuilder ().addLong("time" , System.currentTimeMillis()).toJobParameters();
JobExecution run = jobLauncher.run(searchIndexCreateTaskJob, jobParameters);
run.getId();
}
}}
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {@Autowired
private JobBuilderFactory jobBuilderFactory;@Autowired
private StepBuilderFactory stepBuilderFactory;@Autowired
private SqlSessionFactory sqlSessionFactory;
@Bean("batchTaskExecutor")
public TaskExecutor taskExecutor () {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor ();
executor.setCorePoolSize(5 );
executor.setMaxPoolSize(10 );
executor.setThreadNamePrefix("table-processor-" );
return executor;
}
@Bean("searchIndexCreateTaskJob")
public Job searchIndexCreateTaskJob (
@Qualifier("deleteOldDataStep") Step deleteOldDataStep,
@Qualifier("masterSearchStep") Step masterStep
) {
return jobBuilderFactory.get("searchIndexCreateTaskJob" )
.start(deleteOldDataStep)
.next(masterStep)
.listener(new BatchSearchJobListener ())
.build();
}@Bean
public Step deleteOldDataStep (@Qualifier("deleteTasklet") Tasklet deleteTasklet) {
return stepBuilderFactory.get("deleteOldDataStep" )
.tasklet(deleteTasklet)
.build();
}
@Bean("masterSearchStep")
public Step masterStep (
@Qualifier("updateSearchIndexData") Step updateSearchIndexData,
@Qualifier("multiIndexTablePartitioner") MultiIndexTablePartitioner multiIndexTablePartitioner
) {
return stepBuilderFactory.get("masterSearchStep" )
.partitioner(updateSearchIndexData.getName(), multiIndexTablePartitioner)
.step(updateSearchIndexData)
.gridSize(10 )
.taskExecutor(batchSearchTaskExecutor())
.build();
}
@Bean("batchSearchTaskExecutor")
public TaskExecutor batchSearchTaskExecutor () {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor ();
executor.setCorePoolSize(5 );
executor.setMaxPoolSize(10 );
executor.setThreadNamePrefix("search-processor-" );
return executor;
}
@Bean("updateSearchIndexData")
public Step updateSearchIndexData (
@Qualifier("searchTableReader") MyBatisPagingItemReader> myBatisPagingItemReader,
@Qualifier("batchSearchIndexWriter") BatchSearchIndexWriter batchSearchIndexWriter,
@Qualifier("searchIndexProcessor") SearchIndexProcessor searchIndexProcessor
) {
return stepBuilderFactory.get("updateSearchIndexData" )
., Map>chunk(100 )
.reader(myBatisPagingItemReader)
.processor(searchIndexProcessor)
.writer(batchSearchIndexWriter)
.faultTolerant()
.skipPolicy(new AlwaysSkipItemSkipPolicy ())
.build();
}
@Bean("searchTableReader")
@StepScope
public MyBatisPagingItemReader> searchTableReader (
@Value("#{stepExecutionContext['entityType']}") String entityType, //分类
@Value("#{stepExecutionContext['entityTypeName']}") String entityTypeName, //分类
@Value("#{stepExecutionContext['entityKeyColumn']}") String entityKeyColumn,// 主键字段
@Value("#{stepExecutionContext['entityTable']}") String entityTable,// 表名
@Value("#{stepExecutionContext['entityTableName']}") String entityTableName,// 表名
@Value("#{stepExecutionContext['entitySourceId']}") String entitySourceId,// 表数据来源
@Value("#{stepExecutionContext['titleColumn']}") String titleColumn, // 标题字段
@Value("#{stepExecutionContext['sumaryColumn']}") String sumaryColumn, // 摘要字段
@Value("#{stepExecutionContext['deployTimeColumn']}") String deployTimeColumn, // 发布时间字段
@Value("#{stepExecutionContext['contentColumn']}") String contentColumn, // 内容字段
@Value("#{stepExecutionContext['sourceFrom']}") String sourceFrom, // 数据来源
@Value("#{stepExecutionContext['keywordsDefault']}") String keywordsDefault, // 默认关键词
@Value("#{stepExecutionContext['keywordsColumn']}") String keywordsColumn, // 关键词字段
@Value("#{stepExecutionContext['fileUrlColumn']}") String fileUrlColumn, // 关键词字段
@Value("#{stepExecutionContext['weightDefault']}") String weightDefault, // 默认权重
@Value("#{stepExecutionContext['deleteColumn']}") String deleteColumn, // 逻辑删除字段
@Value("#{stepExecutionContext['lngColumn']}") String lngColumn, // 逻辑删除字段
@Value("#{stepExecutionContext['latColumn']}") String latColumn, // 逻辑删除字段
@Value("#{stepExecutionContext['regionCodeColumn']}") String regionCodeColumn, // 新区区划字段
@Value("#{stepExecutionContext['updateFlag']}") String updateFlag // 是否需要更新
) {MyBatisPagingItemReader> reader = new MyBatisPagingItemReader <>();
reader.setSqlSessionFactory(sqlSessionFactory);
reader.setQueryId("com.bigdatacd.panorama.system.mapper.SearchIndexTableMapper.selectSearchIndexTableDataByPage" );
Map param = new HashMap <>();
param.put(CommonConstant.ENTITY_TYPE,entityType);
param.put(CommonConstant.ENTITY_TYPE_NAME,entityTypeName);
param.put(CommonConstant.ENTITY_KEY_COLUMN,entityKeyColumn);
param.put(CommonConstant.ENTITY_TABLE,entityTable);
param.put(CommonConstant.ENTITY_TABLE_NAME,entityTableName);
param.put(CommonConstant.SOURCE_FROM,sourceFrom);
param.put(CommonConstant.DEPLOY_TIME_COLUMN,deployTimeColumn);
param.put(CommonConstant.FILE_URL_COLUMN,fileUrlColumn);
param.put(CommonConstant.SUMARY_COLUMN,sumaryColumn);
param.put(CommonConstant.ENTITY_SOURCE_ID,entitySourceId);
param.put(CommonConstant.TITLE_COLUMN,titleColumn);
param.put(CommonConstant.CONTET_COLUMN,contentColumn);
param.put(CommonConstant.KEYWORDS_DEFAULT,keywordsDefault);
param.put(CommonConstant.KEYWORDS_COLUMN,keywordsColumn);
param.put(CommonConstant.WEIGHT_DEFAULT,weightDefault);
param.put(CommonConstant.UPDATE_FLAG,updateFlag);
param.put(CommonConstant.DELETE_COLUMN,deleteColumn);
param.put(CommonConstant.LNG_COLUMN,lngColumn);
param.put(CommonConstant.LAT_COLUMN,latColumn);
param.put(CommonConstant.REGION_CODE_COLUMN,regionCodeColumn);
reader.setParameterValues(param);
reader.setPageSize(1000 );
return reader;
}}
@Component
public class BatchSearchJobListener implements JobExecutionListener {private long beingTime;
private long endTime;@Override
public void beforeJob (JobExecution jobExecution) {
beingTime = System.currentTimeMillis();
System.out.println(jobExecution.getJobInstance().getJobName() + " beforeJob...... " + beingTime);
}@Override
public void afterJob (JobExecution jobExecution) {
endTime = System.currentTimeMillis();
System.out.println(jobExecution.getJobInstance().getJobName() + "一共耗耗时:【" + (endTime - beingTime) + "】毫秒" );
}}
@Component("batchSearchIndexWriter")
@StepScope
public class BatchSearchIndexWriter implements ItemWriter > {@Autowired
private NamedParameterJdbcTemplate jdbcTemplate;@Override
public void write (List> items) {
if (CollUtil.isNotEmpty(items)) {
try {
StringBuilder insertSql = new StringBuilder ();
insertSql.append("insert into ads_search_index(" )
.append("pk_id,entity_type,entity_type_name,entity_key_column,entity_id,entity_source_id,entity_table,entity_table_name,title,sumary,file_url,source_from,deploy_time, " )
.append("content,keywords,pingyin_keywords,weight,lng,lat,region_code,region_name,created_time,updated_time)" )
.append(" values " )
.append("(:pkId,:entityType,:entityTypeName,:entityKeyColumn,:entityId,:entitySourceId,:entityTable,:entityTableName,:title,:sumary,:fileUrl,:sourceFrom,:deployTime, " )
.append(":content,:keywords,:pingyinKeywords,:weight,:lng,:lat,:regionCode,:regionName,:createdTime,:updatedTime)" );jdbcTemplate.batchUpdate(insertSql.toString(), items.stream()
.map(item -> new MapSqlParameterSource ()
.addValue(CommonConstant.PK_ID, UUID.randomUUID().toString().replaceAll("-" , CommonConstant.NULL_STR).toUpperCase())
.addValue(CommonConstant.ENTITY_TYPE, ObjectUtil.isNotEmpty(item.get(CommonConstant.ENTITY_TYPE))? item.get(CommonConstant.ENTITY_TYPE) : CommonConstant.NULL_STR)
.addValue(CommonConstant.ENTITY_TYPE_NAME, ObjectUtil.isNotEmpty(item.get(CommonConstant.ENTITY_TYPE_NAME))? item.get(CommonConstant.ENTITY_TYPE_NAME) : CommonConstant.NULL_STR)
.addValue(CommonConstant.ENTITY_KEY_COLUMN, ObjectUtil.isNotEmpty(item.get(CommonConstant.ENTITY_KEY_COLUMN))? item.get(CommonConstant.ENTITY_KEY_COLUMN) : CommonConstant.NULL_STR)
.addValue(CommonConstant.ENTITY_ID, ObjectUtil.isNotEmpty(item.get(CommonConstant.ENTITY_ID))? item.get(CommonConstant.ENTITY_ID) : CommonConstant.NULL_STR)
.addValue(CommonConstant.ENTITY_SOURCE_ID, ObjectUtil.isNotEmpty(item.get(CommonConstant.ENTITY_SOURCE_ID))? item.get(CommonConstant.ENTITY_SOURCE_ID) : CommonConstant.NULL_STR)
.addValue(CommonConstant.ENTITY_TABLE, ObjectUtil.isNotEmpty(item.get(CommonConstant.ENTITY_TABLE))? item.get(CommonConstant.ENTITY_TABLE) : CommonConstant.NULL_STR)
.addValue(CommonConstant.ENTITY_TABLE_NAME, ObjectUtil.isNotEmpty(item.get(CommonConstant.ENTITY_TABLE_NAME))? item.get(CommonConstant.ENTITY_TABLE_NAME) : CommonConstant.NULL_STR)
.addValue(CommonConstant.TITLE, ObjectUtil.isNotEmpty(item.get(CommonConstant.TITLE))? item.get(CommonConstant.TITLE) : CommonConstant.NULL_STR)
.addValue(CommonConstant.SUMARY, ObjectUtil.isNotEmpty(item.get(CommonConstant.SUMARY))? item.get(CommonConstant.SUMARY) : CommonConstant.NULL_STR)
.addValue(CommonConstant.FILE_URL, ObjectUtil.isNotEmpty(item.get(CommonConstant.FILE_URL))? item.get(CommonConstant.FILE_URL) : CommonConstant.NULL_STR)
.addValue(CommonConstant.SOURCE_FROM, ObjectUtil.isNotEmpty(item.get(CommonConstant.SOURCE_FROM))? item.get(CommonConstant.SOURCE_FROM) : CommonConstant.NULL_STR)
.addValue(CommonConstant.DEPLOY_TIME, ObjectUtil.isNotEmpty(item.get(CommonConstant.DEPLOY_TIME))? item.get(CommonConstant.DEPLOY_TIME) : null )
.addValue(CommonConstant.CONTENT, ObjectUtil.isNotEmpty(item.get(CommonConstant.CONTENT))? item.get(CommonConstant.CONTENT) : CommonConstant.NULL_STR)
.addValue(CommonConstant.KEYWORDS, ObjectUtil.isNotEmpty(item.get(CommonConstant.KEYWORDS))? item.get(CommonConstant.KEYWORDS): CommonConstant.NULL_STR)
.addValue(CommonConstant.PINGYIN_KEYWORDS, ObjectUtil.isNotEmpty(item.get(CommonConstant.PINGYIN_KEYWORDS))? item.get(CommonConstant.PINGYIN_KEYWORDS) : CommonConstant.NULL_STR)
.addValue(CommonConstant.WEIGHT, ObjectUtil.isNotEmpty(item.get(CommonConstant.WEIGHT_DEFAULT))? item.get(CommonConstant.WEIGHT_DEFAULT) : 100 )
.addValue(CommonConstant.LNG, ObjectUtil.isNotEmpty(item.get(CommonConstant.LNG))&& LatLonValidator.isLongitudeValid(item.get(CommonConstant.LNG).toString())? item.get(CommonConstant.LNG) : CommonConstant.ZERO)
.addValue(CommonConstant.LAT, ObjectUtil.isNotEmpty(item.get(CommonConstant.LAT)) && LatLonValidator.isLatitudeValid(item.get(CommonConstant.LAT).toString())? item.get(CommonConstant.LAT) : CommonConstant.ZERO)
.addValue(CommonConstant.REGION_CODE, ObjectUtil.isNotEmpty(item.get(CommonConstant.REGION_CODE))? item.get(CommonConstant.REGION_CODE) : CommonConstant.NULL_STR)
.addValue(CommonConstant.REGION_NAME, ObjectUtil.isNotEmpty(item.get(CommonConstant.REGION_NAME))? item.get(CommonConstant.REGION_NAME) : CommonConstant.NULL_STR)
.addValue(CommonConstant.CREATED_TIME, DateUtil.date())
.addValue(CommonConstant.UPDATED_TIME, DateUtil.date())
)
.toArray(SqlParameterSource[]::new ));
} catch (Exception e) {
e.printStackTrace();
throw new CoreException (ErrorCodeEnum.SYS_ERROR, e.getMessage());
}}
}
}
@Component("deleteTasklet")
@StepScope
public class DeleteTasklet implements Tasklet {
@Autowired
private NamedParameterJdbcTemplate jdbcTemplate;@Override
public RepeatStatus execute (StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
StringBuilder sqlBuilder = new StringBuilder ();
sqlBuilder.append("SELECT " );
sqlBuilder.append("pk_id as pkId," );
sqlBuilder.append("entity_type as entityType," );
sqlBuilder.append("entity_type_name as entityTypeName," );
sqlBuilder.append("entity_key_column as entityKeyColumn," );
sqlBuilder.append("entity_table as entityTable," );
sqlBuilder.append("entity_source_id as entitySourceId," );
sqlBuilder.append("title_column as titleColumn," );
sqlBuilder.append("content_column as contentColumn," );
sqlBuilder.append("keywords_default as keywordsDefault," );
sqlBuilder.append("keywords_column as keywordsColumn," );
sqlBuilder.append("update_flag as updateFlag," );
sqlBuilder.append("delete_column as deleteColumn," );
sqlBuilder.append("lng_column as lngColumn," );
sqlBuilder.append("lat_column as latColumn," );
sqlBuilder.append("region_code_column as regionCodeColumn," );
sqlBuilder.append("weight_default as weightDefault" );
sqlBuilder.append(" FROM ads_search_index_table where deleted =0 and update_flag ='1' " );
List> tables = jdbcTemplate.queryForList(sqlBuilder.toString(), new MapSqlParameterSource ());
String delSql = "update ads_search_index set deleted = '1' where deleted = '0' and entity_table = :entityTable and entity_type = :entityType" ;for (int i = 0 ; i < tables.size(); i++) {
jdbcTemplate.update(delSql, new MapSqlParameterSource ().addValue(CommonConstant.ENTITY_TABLE, tables.get(i).get(CommonConstant.ENTITY_TABLE)).addValue(CommonConstant.ENTITY_TYPE, tables.get(i).get(CommonConstant.ENTITY_TYPE)));
}
return RepeatStatus.FINISHED;
}
}
@Component
@Slf4j
public class MultiIndexTablePartitioner implements Partitioner {private final DataSource dataSource;public MultiIndexTablePartitioner (DataSource dataSource) {
this .dataSource = dataSource;
}
@Override
public Map partition (int gridSize) {
JdbcTemplate jdbcTemplate = new JdbcTemplate (dataSource);
StringBuilder sqlBuilder = new StringBuilder ();
sqlBuilder.append("SELECT " );
sqlBuilder.append("pk_id as pkId," );
sqlBuilder.append("entity_type as entityType," );
sqlBuilder.append("entity_type_name as entityTypeName," );
sqlBuilder.append("entity_key_column as entityKeyColumn," );
sqlBuilder.append("entity_table as entityTable," );
sqlBuilder.append("entity_table_name as entityTableName," );
sqlBuilder.append("entity_source_id as entitySourceId," );
sqlBuilder.append("title_column as titleColumn," );
sqlBuilder.append("sumary_column as sumaryColumn," );
sqlBuilder.append("file_url_column as fileUrlColumn," );
sqlBuilder.append("deploy_time_column as deployTimeColumn," );
sqlBuilder.append("source_from as sourceFrom," );
sqlBuilder.append("content_column as contentColumn," );
sqlBuilder.append("keywords_default as keywordsDefault," );
sqlBuilder.append("keywords_column as keywordsColumn," );
sqlBuilder.append("update_flag as updateFlag," );
sqlBuilder.append("delete_column as deleteColumn," );
sqlBuilder.append("lng_column as lngColumn," );
sqlBuilder.append("lat_column as latColumn," );
sqlBuilder.append("region_code_column as regionCodeColumn," );
sqlBuilder.append("weight_default as weightDefault" );
sqlBuilder.append(" FROM ads_search_index_table where deleted =0 and update_flag ='1' " );
List> tables = jdbcTemplate.queryForList(sqlBuilder.toString());
Map partitions = new HashMap <>();
for (int i = 0 ; i < tables.size(); i++) {
ExecutionContext ctx = new ExecutionContext ();
ctx.putString(CommonConstant.PK_ID, String.valueOf(tables.get(i).get(CommonConstant.PK_ID)));
ctx.putString(CommonConstant.ENTITY_TYPE, String.valueOf(tables.get(i).get(CommonConstant.ENTITY_TYPE)));
ctx.putString(CommonConstant.ENTITY_TYPE_NAME, String.valueOf(tables.get(i).get(CommonConstant.ENTITY_TYPE_NAME)));
ctx.putString(CommonConstant.ENTITY_KEY_COLUMN, String.valueOf(tables.get(i).get(CommonConstant.ENTITY_KEY_COLUMN)));
ctx.putString(CommonConstant.ENTITY_TABLE, String.valueOf(tables.get(i).get(CommonConstant.ENTITY_TABLE)));
ctx.putString(CommonConstant.ENTITY_TABLE_NAME, String.valueOf(tables.get(i).get(CommonConstant.ENTITY_TABLE_NAME)));
ctx.putString(CommonConstant.ENTITY_SOURCE_ID, String.valueOf(tables.get(i).get(CommonConstant.ENTITY_SOURCE_ID)));
ctx.putString(CommonConstant.TITLE_COLUMN, String.valueOf(tables.get(i).get(CommonConstant.TITLE_COLUMN)));
ctx.putString(CommonConstant.SUMARY_COLUMN, String.valueOf(tables.get(i).get(CommonConstant.SUMARY_COLUMN)));
ctx.putString(CommonConstant.FILE_URL_COLUMN, String.valueOf(tables.get(i).get(CommonConstant.FILE_URL_COLUMN)));
ctx.putString(CommonConstant.DEPLOY_TIME_COLUMN, String.valueOf(tables.get(i).get(CommonConstant.DEPLOY_TIME_COLUMN)));
ctx.putString(CommonConstant.SOURCE_FROM, String.valueOf(tables.get(i).get(CommonConstant.SOURCE_FROM)));
ctx.putString(CommonConstant.CONTET_COLUMN, String.valueOf(tables.get(i).get(CommonConstant.CONTET_COLUMN)));
ctx.putString(CommonConstant.KEYWORDS_COLUMN, String.valueOf(tables.get(i).get(CommonConstant.KEYWORDS_COLUMN)));
ctx.putString(CommonConstant.KEYWORDS_DEFAULT, String.valueOf(tables.get(i).get(CommonConstant.KEYWORDS_DEFAULT)));
ctx.putString(CommonConstant.WEIGHT_DEFAULT, String.valueOf(tables.get(i).get(CommonConstant.WEIGHT_DEFAULT)));
ctx.putString(CommonConstant.UPDATE_FLAG, String.valueOf(tables.get(i).get(CommonConstant.UPDATE_FLAG)));
ctx.putString(CommonConstant.DELETE_COLUMN, String.valueOf(tables.get(i).get(CommonConstant.DELETE_COLUMN)));
ctx.putString(CommonConstant.LNG_COLUMN, String.valueOf(tables.get(i).get(CommonConstant.LNG_COLUMN)));
ctx.putString(CommonConstant.LAT_COLUMN, String.valueOf(tables.get(i).get(CommonConstant.LAT_COLUMN)));
ctx.putString(CommonConstant.REGION_CODE_COLUMN, String.valueOf(tables.get(i).get(CommonConstant.REGION_CODE_COLUMN)));
partitions.put("partition" + i, ctx);
}
return partitions;
}
}
@Component("searchIndexProcessor")
@Builder
public class SearchIndexProcessor implements ItemProcessor , Map> {@Autowired
private SysRegionService sysRegionService;@Override
public Map process (Map item) {
item.put(CommonConstant.TITLE, item.get(CommonConstant.TITLE).toString());
item.put(CommonConstant.SUMARY, ObjectUtil.isNotEmpty(item.get(CommonConstant.SUMARY))?item.get(CommonConstant.SUMARY).toString():CommonConstant.NULL_STR);
item.put(CommonConstant.CONTENT, HanLpAdvancedUtil.segmentToString(item.get(CommonConstant.CONTENT).toString()));
item.put(CommonConstant.KEYWORDS, HanLpAdvancedUtil.extractKeywords(item.get(CommonConstant.KEYWORDS).toString()));
item.put(CommonConstant.PINGYIN_KEYWORDS, KeyWordUtils.getPinyin(item.get(CommonConstant.KEYWORDS).toString(), CommonConstant.SPACE));
if (ObjectUtil.isNotEmpty(item.get(CommonConstant.REGION_CODE))) {
if (item.get(CommonConstant.REGION_CODE).toString().length() < CommonConstant.TWELVE_INT) {
item.put(CommonConstant.REGION_CODE, StrUtil.padAfter(item.get(CommonConstant.REGION_CODE).toString(), CommonConstant.TWELVE_INT, CommonConstant.ZERO));
}
if (ObjectUtil.isNotEmpty(sysRegionService.getRegionNameByCode(item.get(CommonConstant.REGION_CODE).toString()))) {
item.put(CommonConstant.REGION_NAME, sysRegionService.getRegionNameByCode(item.get(CommonConstant.REGION_CODE).toString()));
} else {
item.put(CommonConstant.REGION_CODE, CommonConstant.NULL_STR);
item.put(CommonConstant.REGION_NAME, CommonConstant.NULL_STR);
}
}
return item;
}
}
效果图