1.介绍
当系统有大量数据需要从数据库导入elasticsearch时,使用spring batch可以提高导入的效率。spring batch使用itemreader分页读取数据,itemwriter批量写数据。由于spring batch没有提供elastisearch的itemwriter和itemreader,本示例中自定义一个elasticsearchitemwriter(elasticsearchitemreader),用于批量导入。
源码地址:
2.示例
2.1 pom.xml
本文使用spring data jest连接es(也可以使用spring data elasticsearch连接es),es版本为5.5.3
4.0.0
com.hfcsbc.estl
es-etl
0.0.1-snapshot
jar
es-etl
demo project for spring boot
org.springframework.boot
spring-boot-starter-parent
2.0.0.m7
utf-8
utf-8
1.8
org.springframework.boot
spring-boot-starter
org.springframework.boot
spring-boot-starter-data-jpa
org.postgresql
postgresql
org.springframework.boot
spring-boot-starter-batch
com.github.vanroy
spring-boot-starter-data-jest
3.0.0.release
io.searchbox
jest
5.3.2
org.projectlombok
lombok
org.springframework.boot
spring-boot-starter-test
test
org.springframework.boot
spring-boot-maven-plugin
spring-snapshots
spring snapshots
https://repo.spring.io/snapshot
true
spring-milestones
spring milestones
https://repo.spring.io/milestone
false
spring-snapshots
spring snapshots
https://repo.spring.io/snapshot
true
spring-milestones
spring milestones
https://repo.spring.io/milestone
false
2.2 实体类及repository
package com.hfcsbc.esetl.domain;
import lombok.data;
import org.springframework.data.elasticsearch.annotations.document;
import org.springframework.data.elasticsearch.annotations.field;
import org.springframework.data.elasticsearch.annotations.fieldtype;
import javax.persistence.entity;
import javax.persistence.id;
import javax.persistence.onetoone;
/**
* create by pengchao on 2018/2/23
*/
@document(indexname = "person", type = "person", shards = 1, replicas = 0, refreshinterval = "-1")
@entity
@data
public class person {
@id
private long id;
private string name;
@onetoone
@field(type = fieldtype.nested)
private address address;
}
package com.hfcsbc.esetl.domain;
import lombok.data;
import javax.persistence.entity;
import javax.persistence.id;
/**
* create by pengchao on 2018/2/23
*/
@entity
@data
public class address {
@id
private long id;
private string name;
}
package com.hfcsbc.esetl.repository.jpa;
import com.hfcsbc.esetl.domain.person;
import org.springframework.data.jpa.repository.jparepository;
/**
* create by pengchao on 2018/2/23
*/
public interface personrepository extends jparepository {
}
package com.hfcsbc.esetl.repository.es;
import com.hfcsbc.esetl.domain.person;
import org.springframework.data.elasticsearch.repository.elasticsearchrepository;
/**
* create by pengchao on 2018/2/23
*/
public interface espersonrepository extends elasticsearchrepository {
}
2.3 配置elasticsearchitemwriter
package com.hfcsbc.esetl.itemwriter;
import com.hfcsbc.esetl.repository.es.espersonrepository;
import com.hfcsbc.esetl.domain.person;
import org.springframework.batch.core.exitstatus;
import org.springframework.batch.core.itemwritelistener;
import org.springframework.batch.core.stepexecution;
import org.springframework.batch.core.stepexecutionlistener;
import org.springframework.batch.item.itemwriter;
import java.util.list;
/**
* create by pengchao on 2018/2/23
*/
public class elasticsearchitemwriter implements itemwriter, itemwritelistener, stepexecutionlistener {
private espersonrepository personrepository;
public elasticsearchitemwriter(espersonrepository personrepository) {
this.personrepository = personrepository;
}
@override
public void beforewrite(list items) {
}
@override
public void afterwrite(list items) {
}
@override
public void onwriteerror(exception exception, list items) {
}
@override
public void beforestep(stepexecution stepexecution) {
}
@override
public exitstatus afterstep(stepexecution stepexecution) {
return null;
}
@override
public void write(list items) throws exception {
//实现类abstractelasticsearchrepository的saveall方法调用的是elasticsearchoperations.bulkindex(queries),为批量索引
personrepository.saveall(items);
}
}
2.4 配置elasticsearchitemreader(本示例未使用,仅供参考)
package com.hfcsbc.esetl.itemreader;
import org.springframework.batch.item.data.abstractpaginateddataitemreader;
import org.springframework.beans.factory.initializingbean;
import org.springframework.data.elasticsearch.core.elasticsearchoperations;
import org.springframework.data.elasticsearch.core.query.searchquery;
import java.util.iterator;
/**
* create by pengchao on 2018/2/24
*/
public class elasticsearchitemreader extends abstractpaginateddataitemreader implements initializingbean {
private final elasticsearchoperations elasticsearchoperations;
private final searchquery query;
private final class targettype;
public elasticsearchitemreader(elasticsearchoperations elasticsearchoperations, searchquery query, class targettype) {
this.elasticsearchoperations = elasticsearchoperations;
this.query = query;
this.targettype = targettype;
}
@override
protected iterator dopageread() {
return (iterator)elasticsearchoperations.queryforlist(query, targettype).iterator();
}
@override
public void afterpropertiesset() throws exception {
}
}
2.5 配置spring batch需要的配置
package com.hfcsbc.esetl.config;
import com.hfcsbc.esetl.itemwriter.elasticsearchitemwriter;
import com.hfcsbc.esetl.repository.es.espersonrepository;
import com.hfcsbc.esetl.domain.person;
import org.springframework.batch.core.job;
import org.springframework.batch.core.step;
import org.springframework.batch.core.configuration.annotation.enablebatchprocessing;
import org.springframework.batch.core.configuration.annotation.jobbuilderfactory;
import org.springframework.batch.core.configuration.annotation.stepbuilderfactory;
import org.springframework.batch.core.launch.support.runidincrementer;
import org.springframework.batch.core.repository.jobrepository;
import org.springframework.batch.core.repository.support.jobrepositoryfactorybean;
import org.springframework.batch.item.itemreader;
import org.springframework.batch.item.itemwriter;
import org.springframework.batch.item.database.jpapagingitemreader;
import org.springframework.batch.item.database.orm.jpanativequeryprovider;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.transaction.platformtransactionmanager;
import javax.persistence.entitymanagerfactory;
import javax.sql.datasource;
/**
* create by pengchao on 2018/2/23
*/
@configuration
@enablebatchprocessing
public class batchconfig {
@autowired
private espersonrepository personrepository;
@bean
public itemreader orderitemreader(entitymanagerfactory entitymanagerfactory){
jpapagingitemreader reader = new jpapagingitemreader();
string sqlquery = "select * from person";
try {
jpanativequeryprovider queryprovider = new jpanativequeryprovider();
queryprovider.setsqlquery(sqlquery);
queryprovider.setentityclass(person.class);
queryprovider.afterpropertiesset();
reader.setentitymanagerfactory(entitymanagerfactory);
reader.setpagesize(10000);
reader.setqueryprovider(queryprovider);
reader.afterpropertiesset();
reader.setsavestate(true);
} catch (exception e) {
e.printstacktrace();
}
return reader;
}
@bean
public elasticsearchitemwriter itemwriter(){
return new elasticsearchitemwriter(personrepository);
}
@bean
public step step(stepbuilderfactory stepbuilderfactory,
itemreader itemreader,
itemwriter itemwriter){
return stepbuilderfactory
.get("step1")
.chunk(10000)
.reader(itemreader)
.writer(itemwriter)
.build();
}
@bean
public job job(jobbuilderfactory jobbuilderfactory, step step){
return jobbuilderfactory
.get("importjob")
.incrementer(new runidincrementer())
.flow(step)
.end()
.build();
}
/**
* spring batch执行时会创建一些自身需要的表,这里指定表创建的位置:datasource
* @param datasource
* @param manager
* @return
*/
@bean
public jobrepository jobrepository(datasource datasource, platformtransactionmanager manager){
jobrepositoryfactorybean jobrepositoryfactorybean = new jobrepositoryfactorybean();
jobrepositoryfactorybean.setdatasource(datasource);
jobrepositoryfactorybean.settransactionmanager(manager);
jobrepositoryfactorybean.setdatabasetype("postgres");
try {
return jobrepositoryfactorybean.getobject();
} catch (exception e) {
e.printstacktrace();
}
return null;
}
}
2.6配置数据库及es的连接地址
spring:
redis:
host: 192.168.1.222
data:
jest:
uri: http://192.168.1.222:9200
username: elastic
password: changeme
jpa:
database: postgresql
show-sql: true
hibernate:
ddl-auto: update
datasource:
platform: postgres
url: jdbc:postgresql://192.168.1.222:5433/person
username: hfcb
password: hfcb
driver-class-name: org.postgresql.driver
max-active: 2
spring.batch.initialize-schema: always
2.7 配置入口类
package com.hfcsbc.esetl;
import org.springframework.boot.springapplication;
import org.springframework.boot.autoconfigure.springbootapplication;
import org.springframework.boot.autoconfigure.data.elasticsearch.elasticsearchautoconfiguration;
import org.springframework.boot.autoconfigure.data.elasticsearch.elasticsearchdataautoconfiguration;
import org.springframework.data.elasticsearch.repository.config.enableelasticsearchrepositories;
import org.springframework.data.jpa.repository.config.enablejparepositories;
@springbootapplication(exclude = {elasticsearchautoconfiguration.class, elasticsearchdataautoconfiguration.class})
@enableelasticsearchrepositories(basepackages = "com.hfcsbc.esetl.repository")
@enablejparepositories(basepackages = "com.hfcsbc.esetl.repository.jpa")
public class esetlapplication {
public static void main(string[] args) {
springapplication.run(esetlapplication.class, args);
}
}
相关推荐
本文介绍了基于spring batch向elasticsearch批量导入数据示例,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
基于spring batch的大数据量并行处理 基于spring batch的大数据量并行处理
最近在研究springboot springbatch ,按照欧洲杯足彩官网的实例做了一个实例。 最近在研究springboot springbatch ,按照欧洲杯足彩官网的实例做了一个实例。
springbatch springboot构建海量数据企业批处理系统和性能优化,spring batch是一个基于spring的企业级批处理框架,所谓企业批处理就是指在企业级应用中,不需要人工干预,定期读取数据,进行相应的业务处理之后,再...
spring batch是spring的一个子项目,使用java语言并基于spring框架为基础开发,使得已经使用 spring 框架的开发者或者企业更容易访问和利用企业服务。 spring batch 提供了大量可重用的组件,包括了日志、追踪、事务、...
基于springbatch的某呼叫中心平台批处理功能的设计与实现,王旭涛,詹舒波,伴随着j2ee技术的快速发展和数据量的不断增长,对海量客户数据的合理高效的存储、处理、提取和任务的合理安排成为企业呼叫中心中��
spring batch是一个轻量级的,完全面向spring的批处理框架,可以应用于企业级大量的数据处理系统。spring batch以pojo和大家熟知的spring框架为基础,使开发者更容易的访问和利用企业级服务。spring batch可以提供...
spring boot整合spring batch的一个小例子,在网上发现这方面的资源比较少,特此将其上传供大家学习。
spring batch api(spring batch 开发文档).chm。 欧洲杯足彩官网 spring batch api,spring batch 开发文档
主要给大家介绍了spring batch读取txt文件并写入数据库的方法,springbatch 是一个轻量级、全面的批处理框架。这里我们用它来实现文件的读取并将读取的结果作处理,处理之后再写入数据库中的功能。需要的朋友可以...
spring-batch4.0.0 batch spring-batch集成 spring-batch.jar
在springbatch每一个任务都有不同的step组成。每一个step 可以拆解为 读数据 ,处理数据,写入数据 3 个步骤。如果你的任务不涉及读,处理,写 这种特点,只是简单的任务处理 springbatch 也提供了 tasklet 来处理...
spring batch批处理框架和对应的源码资源 rar 可以直接运行的
work with all aspects of batch processing in a modern java environment using a selection of spring frameworks. this book provides up-to-date examples using the latest configuration techniques based on...
分享视频教程——springbatch springboot构建海量数据企业批处理系统和性能优化;springbatch为我们提供了统一的读写接口、丰富的任务处理方式、灵活的事务管理及并发处理、日志、监控、任务重启与跳过等特性,功能...
《spring batch 批处理框架》全面、系统地介绍了批处理框架spring batch,通过详尽的实战示例向读者展示了spring batch框架对大数据批处理的基本开发能力,并对框架的架构设计、源码做了特定的剖析;在帮助读者掌握...
1.本项目运行在tomcat容器中,主要功能为从spring_batch_left库的user_from表抓取数据,之后批量插入到spring_batch_right库的user_to表 2.应用quartz对job进行定时触发(目前设置的定时为每隔一分钟执行一次,目前...
springbatch数据库建表语句,存储springbatch批处理过程中需要保存的数据和步骤信息
spring batch in action is a comprehensive, in-depth guide to writing batch applications using spring batch. written for developers who have basic knowledge of java and the spring lightweight ...