基于spring batch向elasticsearch批量导入数据 -欧洲杯足彩官网

`
wiselyman
  • 浏览: 2077001 次
  • 性别:
  • 来自: 合肥
博主相关
  • 博客
  • 微博
  • 相册
  • 收藏
  • 博客专栏
    点睛spring4.1
    浏览量:80745
    点睛spring mvc4...
    浏览量:129940
    社区版块
    • ( 11)
    • ( 19)
    • ( 0)
    存档分类
    最新评论

    基于spring batch向elasticsearch批量导入数据

    1.介绍

    当系统有大量数据需要从数据库导入elasticsearch时,使用spring batch可以提高导入的效率。spring batch使用itemreader分页读取数据,itemwriter批量写数据。由于spring batch没有提供elastisearch的itemwriter和itemreader,本示例中自定义一个elasticsearchitemwriter(elasticsearchitemreader),用于批量导入。

     

    源码地址:

    2.示例

    2.1 pom.xml

    本文使用spring data jest连接es(也可以使用spring data elasticsearch连接es),es版本为5.5.3

    
    
        4.0.0
        com.hfcsbc.estl
        es-etl
        0.0.1-snapshot
        jar
        es-etl
        demo project for spring boot
        
            org.springframework.boot
            spring-boot-starter-parent
            2.0.0.m7
             
        
        
            utf-8
            utf-8
            1.8
        
        
            
                org.springframework.boot
                spring-boot-starter
            
            
                org.springframework.boot
                spring-boot-starter-data-jpa
            
            
                org.postgresql
                postgresql
            
            
                org.springframework.boot
                spring-boot-starter-batch
            
            
                com.github.vanroy
                spring-boot-starter-data-jest
                3.0.0.release
            
            
                io.searchbox
                jest
                5.3.2
            
            
                org.projectlombok
                lombok
            
            
                org.springframework.boot
                spring-boot-starter-test
                test
            
        
        
            
                
                    org.springframework.boot
                    spring-boot-maven-plugin
                
            
        
        
            
                spring-snapshots
                spring snapshots
                https://repo.spring.io/snapshot
                
                    true
                
            
            
                spring-milestones
                spring milestones
                https://repo.spring.io/milestone
                
                    false
                
            
        
        
            
                spring-snapshots
                spring snapshots
                https://repo.spring.io/snapshot
                
                    true
                
            
            
                spring-milestones
                spring milestones
                https://repo.spring.io/milestone
                
                    false
                
            
        
    
    

    2.2 实体类及repository

    package com.hfcsbc.esetl.domain;
    import lombok.data;
    import org.springframework.data.elasticsearch.annotations.document;
    import org.springframework.data.elasticsearch.annotations.field;
    import org.springframework.data.elasticsearch.annotations.fieldtype;
    import javax.persistence.entity;
    import javax.persistence.id;
    import javax.persistence.onetoone;
    /**
     * create by pengchao on 2018/2/23
     */
    @document(indexname = "person", type = "person", shards = 1, replicas = 0, refreshinterval = "-1")
    @entity
    @data
    public class person {
        @id
        private long id;
        private string name;
        @onetoone
        @field(type = fieldtype.nested)
        private address address;
    }
    
    package com.hfcsbc.esetl.domain;
    import lombok.data;
    import javax.persistence.entity;
    import javax.persistence.id;
    /**
     * create by pengchao on 2018/2/23
     */
    @entity
    @data
    public class address {
        @id
        private long id;
        private string name;
    }
    
    package com.hfcsbc.esetl.repository.jpa;
    import com.hfcsbc.esetl.domain.person;
    import org.springframework.data.jpa.repository.jparepository;
    /**
     * create by pengchao on 2018/2/23
     */
    public interface personrepository extends jparepository {
    }
    
    package com.hfcsbc.esetl.repository.es;
    import com.hfcsbc.esetl.domain.person;
    import org.springframework.data.elasticsearch.repository.elasticsearchrepository;
    /**
     * create by pengchao on 2018/2/23
     */
    public interface espersonrepository extends elasticsearchrepository {
    }
    

    2.3 配置elasticsearchitemwriter

    package com.hfcsbc.esetl.itemwriter;
    import com.hfcsbc.esetl.repository.es.espersonrepository;
    import com.hfcsbc.esetl.domain.person;
    import org.springframework.batch.core.exitstatus;
    import org.springframework.batch.core.itemwritelistener;
    import org.springframework.batch.core.stepexecution;
    import org.springframework.batch.core.stepexecutionlistener;
    import org.springframework.batch.item.itemwriter;
    import java.util.list;
    /**
     * create by pengchao on 2018/2/23
     */
    public class elasticsearchitemwriter implements itemwriter, itemwritelistener, stepexecutionlistener {
        private espersonrepository personrepository;
        public elasticsearchitemwriter(espersonrepository personrepository) {
            this.personrepository = personrepository;
        }
        @override
        public void beforewrite(list items) {
        }
        @override
        public void afterwrite(list items) {
        }
        @override
        public void onwriteerror(exception exception, list items) {
        }
        @override
        public void beforestep(stepexecution stepexecution) {
        }
        @override
        public exitstatus afterstep(stepexecution stepexecution) {
            return null;
        }
        @override
        public void write(list items) throws exception {
            //实现类abstractelasticsearchrepository的saveall方法调用的是elasticsearchoperations.bulkindex(queries),为批量索引
            personrepository.saveall(items);
        }
    }
    

    2.4 配置elasticsearchitemreader(本示例未使用,仅供参考)

    package com.hfcsbc.esetl.itemreader;
    import org.springframework.batch.item.data.abstractpaginateddataitemreader;
    import org.springframework.beans.factory.initializingbean;
    import org.springframework.data.elasticsearch.core.elasticsearchoperations;
    import org.springframework.data.elasticsearch.core.query.searchquery;
    import java.util.iterator;
    /**
     * create by pengchao on 2018/2/24
     */
    public class elasticsearchitemreader extends abstractpaginateddataitemreader implements initializingbean {
        private final elasticsearchoperations elasticsearchoperations;
        private final searchquery query;
        private final class targettype;
        public elasticsearchitemreader(elasticsearchoperations elasticsearchoperations, searchquery query, class targettype) {
            this.elasticsearchoperations = elasticsearchoperations;
            this.query = query;
            this.targettype = targettype;
        }
        @override
        protected iterator dopageread() {
            return (iterator)elasticsearchoperations.queryforlist(query, targettype).iterator();
        }
        @override
        public void afterpropertiesset() throws exception {
        }
    }
    

    2.5 配置spring batch需要的配置

    package com.hfcsbc.esetl.config;
    import com.hfcsbc.esetl.itemwriter.elasticsearchitemwriter;
    import com.hfcsbc.esetl.repository.es.espersonrepository;
    import com.hfcsbc.esetl.domain.person;
    import org.springframework.batch.core.job;
    import org.springframework.batch.core.step;
    import org.springframework.batch.core.configuration.annotation.enablebatchprocessing;
    import org.springframework.batch.core.configuration.annotation.jobbuilderfactory;
    import org.springframework.batch.core.configuration.annotation.stepbuilderfactory;
    import org.springframework.batch.core.launch.support.runidincrementer;
    import org.springframework.batch.core.repository.jobrepository;
    import org.springframework.batch.core.repository.support.jobrepositoryfactorybean;
    import org.springframework.batch.item.itemreader;
    import org.springframework.batch.item.itemwriter;
    import org.springframework.batch.item.database.jpapagingitemreader;
    import org.springframework.batch.item.database.orm.jpanativequeryprovider;
    import org.springframework.beans.factory.annotation.autowired;
    import org.springframework.context.annotation.bean;
    import org.springframework.context.annotation.configuration;
    import org.springframework.transaction.platformtransactionmanager;
    import javax.persistence.entitymanagerfactory;
    import javax.sql.datasource;
    /**
     * create by pengchao on 2018/2/23
     */
    @configuration
    @enablebatchprocessing
    public class batchconfig {
        @autowired
        private espersonrepository personrepository;
        @bean
        public itemreader orderitemreader(entitymanagerfactory entitymanagerfactory){
            jpapagingitemreader reader = new jpapagingitemreader();
            string sqlquery = "select * from person";
            try {
                jpanativequeryprovider queryprovider = new jpanativequeryprovider();
                queryprovider.setsqlquery(sqlquery);
                queryprovider.setentityclass(person.class);
                queryprovider.afterpropertiesset();
                reader.setentitymanagerfactory(entitymanagerfactory);
                reader.setpagesize(10000);
                reader.setqueryprovider(queryprovider);
                reader.afterpropertiesset();
                reader.setsavestate(true);
            } catch (exception e) {
                e.printstacktrace();
            }
            return reader;
        }
        @bean
        public elasticsearchitemwriter itemwriter(){
            return new elasticsearchitemwriter(personrepository);
        }
        @bean
        public step step(stepbuilderfactory stepbuilderfactory,
                         itemreader itemreader,
                         itemwriter itemwriter){
            return stepbuilderfactory
                    .get("step1")
                    .chunk(10000)
                    .reader(itemreader)
                    .writer(itemwriter)
                    .build();
        }
        @bean
        public job job(jobbuilderfactory jobbuilderfactory, step step){
            return jobbuilderfactory
                    .get("importjob")
                    .incrementer(new runidincrementer())
                    .flow(step)
                    .end()
                    .build();
        }
        /**
         * spring batch执行时会创建一些自身需要的表,这里指定表创建的位置:datasource
         * @param datasource
         * @param manager
         * @return
         */
        @bean
        public jobrepository jobrepository(datasource datasource, platformtransactionmanager manager){
            jobrepositoryfactorybean jobrepositoryfactorybean = new jobrepositoryfactorybean();
            jobrepositoryfactorybean.setdatasource(datasource);
            jobrepositoryfactorybean.settransactionmanager(manager);
            jobrepositoryfactorybean.setdatabasetype("postgres");
            try {
                return jobrepositoryfactorybean.getobject();
            } catch (exception e) {
                e.printstacktrace();
            }
            return null;
        }
    }
    

    2.6配置数据库及es的连接地址

    spring:
      redis:
        host: 192.168.1.222
      data:
        jest:
          uri: http://192.168.1.222:9200
          username: elastic
          password: changeme
      jpa:
        database: postgresql
        show-sql: true
        hibernate:
          ddl-auto: update
      datasource:
        platform: postgres
        url: jdbc:postgresql://192.168.1.222:5433/person
        username: hfcb
        password: hfcb
        driver-class-name: org.postgresql.driver
        max-active: 2
    spring.batch.initialize-schema: always
    

    2.7 配置入口类

    package com.hfcsbc.esetl;
    import org.springframework.boot.springapplication;
    import org.springframework.boot.autoconfigure.springbootapplication;
    import org.springframework.boot.autoconfigure.data.elasticsearch.elasticsearchautoconfiguration;
    import org.springframework.boot.autoconfigure.data.elasticsearch.elasticsearchdataautoconfiguration;
    import org.springframework.data.elasticsearch.repository.config.enableelasticsearchrepositories;
    import org.springframework.data.jpa.repository.config.enablejparepositories;
    @springbootapplication(exclude = {elasticsearchautoconfiguration.class, elasticsearchdataautoconfiguration.class})
    @enableelasticsearchrepositories(basepackages = "com.hfcsbc.esetl.repository")
    @enablejparepositories(basepackages = "com.hfcsbc.esetl.repository.jpa")
    public class esetlapplication {
        public static void main(string[] args) {
            springapplication.run(esetlapplication.class, args);
        }
    }

     

     
    0
    0
    分享到:
    评论

    相关推荐

      本文介绍了基于spring batch向elasticsearch批量导入数据示例,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧

      基于spring batch的大数据量并行处理 基于spring batch的大数据量并行处理

      最近在研究springboot springbatch ,按照欧洲杯足彩官网的实例做了一个实例。 最近在研究springboot springbatch ,按照欧洲杯足彩官网的实例做了一个实例。

      springbatch springboot构建海量数据企业批处理系统和性能优化,spring batch是一个基于spring的企业级批处理框架,所谓企业批处理就是指在企业级应用中,不需要人工干预,定期读取数据,进行相应的业务处理之后,再...

      spring batch是spring的一个子项目,使用java语言并基于spring框架为基础开发,使得已经使用 spring 框架的开发者或者企业更容易访问和利用企业服务。 spring batch 提供了大量可重用的组件,包括了日志、追踪、事务、...

      基于springbatch的某呼叫中心平台批处理功能的设计与实现,王旭涛,詹舒波,伴随着j2ee技术的快速发展和数据量的不断增长,对海量客户数据的合理高效的存储、处理、提取和任务的合理安排成为企业呼叫中心中��

      spring batch是一个轻量级的,完全面向spring的批处理框架,可以应用于企业级大量的数据处理系统。spring batch以pojo和大家熟知的spring框架为基础,使开发者更容易的访问和利用企业级服务。spring batch可以提供...

      spring boot整合spring batch的一个小例子,在网上发现这方面的资源比较少,特此将其上传供大家学习。

      spring batch api(spring batch 开发文档).chm。 欧洲杯足彩官网 spring batch api,spring batch 开发文档

      主要给大家介绍了spring batch读取txt文件并写入数据库的方法,springbatch 是一个轻量级、全面的批处理框架。这里我们用它来实现文件的读取并将读取的结果作处理,处理之后再写入数据库中的功能。需要的朋友可以...

      spring-batch4.0.0 batch spring-batch集成 spring-batch.jar

      在springbatch每一个任务都有不同的step组成。每一个step 可以拆解为 读数据 ,处理数据,写入数据 3 个步骤。如果你的任务不涉及读,处理,写 这种特点,只是简单的任务处理 springbatch 也提供了 tasklet 来处理...

      spring batch批处理框架和对应的源码资源 rar 可以直接运行的

      work with all aspects of batch processing in a modern java environment using a selection of spring frameworks. this book provides up-to-date examples using the latest configuration techniques based on...

      分享视频教程——springbatch springboot构建海量数据企业批处理系统和性能优化;springbatch为我们提供了统一的读写接口、丰富的任务处理方式、灵活的事务管理及并发处理、日志、监控、任务重启与跳过等特性,功能...

      《spring batch 批处理框架》全面、系统地介绍了批处理框架spring batch,通过详尽的实战示例向读者展示了spring batch框架对大数据批处理的基本开发能力,并对框架的架构设计、源码做了特定的剖析;在帮助读者掌握...

      1.本项目运行在tomcat容器中,主要功能为从spring_batch_left库的user_from表抓取数据,之后批量插入到spring_batch_right库的user_to表 2.应用quartz对job进行定时触发(目前设置的定时为每隔一分钟执行一次,目前...

      springbatch数据库建表语句,存储springbatch批处理过程中需要保存的数据和步骤信息

      spring batch in action is a comprehensive, in-depth guide to writing batch applications using spring batch. written for developers who have basic knowledge of java and the spring lightweight ...

    global site tag (gtag.js) - google analytics
    网站地图