博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spring Batch 入门之 CSV-to-DB
阅读量:2168 次
发布时间:2019-05-01

本文共 10258 字,大约阅读时间需要 34 分钟。

学习使用Spring batch从CSV文件读取记录,并使用 StaxEventItemWriter 输出经过处理的记录转换为 XML 的数据。

spring-batch-reference-model.png

  1. JobLauncher: 顾名思义,该领域对象就是Job的启动器,其作用就是绑定一组JobParameters到Job上,然后运行该Job。

  2. Job: 定义,配置批处理任务的领域对象,该对象的作用,第一是做Step的容器,配置该批处理任务需要的Step,以及他们之间的逻辑关系。第二是配置该批处理任务的特征,比方说名字,是否可重启,是否对JobParameters进行验证以及验证规则等。

  3. Step: 定义批处理任务中一个对立的逻辑任务处理单元。基本上的业务逻辑处理代码都是封装在Step中的。Step有2种实现形式,一种是Tasklet形式的,这种形式非常自由,开发人员只需要实现Tasklet接口,其中的逻辑完全有自己决定,另一种是Chunk-Oriented形式的,这种形式定义了一个Step的流程必须是“读-处理(可选)-写”,当然Spring Batch也对每一个步骤提供了接口ItemReader, ItemProcessor,ItemWriter还有很多常用的默认实现(读文件,读数据库,写文件,写数据库等等)。 每一个Step只能由一个Tasklet或者一个Chunk构成。

  4. JobRepository: 该领域对象会为Spring Batch的运维数据提供一种持久化机制。其为所有的运维数据的提供CRUD的操作接口,并为所有的操作提供事务支持。

项目概述

在这个应用程序中,我们将执行以下任务:

  1. 使用 FlatFileItemReader 从CSV文件读取交易记录
  2. 使用 CustomItemProcessor 进行项目的业务处理。当 ItemReader 读取一个项目,而 ItemWriter 写入它们时,
    ItemProcessor 提供一个转换或应用其他业务处理的访问点。
  3. 使用 StaxEventItemWriter 获取 CustomItemProcessor 的处理结果,并将它转换成 XML 类型数据作为最终输出。
  4. 使用 MyBatisBatchItemWriter 获取 CustomItemProcessor 的处理结果,并将它转换成 XML 类型数据作为最终输出。
  5. 查看MySQL

工程结构

在这里插入图片描述

Maven 依赖

sqlite-jdbcmysql-connector-java 可以选择其中一个。

当选择其中一种时,同时也要在 applicationContext.xml 文件中做出相应的改动。

改动:

  • 依赖的版本由 platform-bom 来统一管理
  • 添加 mybatis, mybatis-spring
4.0.0
com.littlefxc.example
Spring-CSV-to-DB
1.0-snapshot
UTF-8
5.0.9.RELEASE
4.0.1.RELEASE
3.8.11.2
5.1.47
io.spring.platform
platform-bom
Cairo-RELEASE
pom
import
mysql
mysql-connector-java
org.springframework
spring-oxm
org.mybatis
mybatis
3.5.0
org.mybatis
mybatis-spring
2.0.0
org.springframework
spring-jdbc
org.springframework.batch
spring-batch-core
org.projectlombok
lombok

applicationContext.xml

我们将使用 FlatFileItemReader 读取 CSV 文件。

我们将使用它的标准配置,包括 DefaultLineMapperDelimitedLineTokenizerBeanWrapperFieldSetMapper 类。
为了在XML文件中输出记录,我们将使用 StaxEventItemWriter 作为标准编写器。

改动:

  • 将输出XML变为输出到mysql
  • Spring Batch 持久层框架由 spring-jdbc 改为 mybatis, mybatis-spring

当然,原来的输出 itemWriter 去掉注释后,仍然起作用

RecordFieldSetMapper

ItemReader 的属性,作用是将 FieldSet 转换为对象

package com.littlefxc.examples.batch.service;import com.littlefxc.examples.batch.model.TransactionRecord;import org.springframework.batch.item.file.mapping.FieldSetMapper;import org.springframework.batch.item.file.transform.FieldSet;import org.springframework.validation.BindException;import java.text.ParseException;import java.text.SimpleDateFormat;/** * 将 FieldSet 转换为对象 * @author fengxuechao * @date 2019/1/4 **/public class RecordFieldSetMapper implements FieldSetMapper
{
public Transaction mapFieldSet(FieldSet fieldSet) throws BindException {
SimpleDateFormat dateFormat = new SimpleDateFormat("dd/MM/yyyy"); Transaction transactionRecord = new Transaction(); transactionRecord.setUsername(fieldSet.readString("username")); transactionRecord.setUserId(fieldSet.readInt("user_id")); transactionRecord.setAmount(fieldSet.readDouble("transaction_amount")); String dateString = fieldSet.readString("transaction_date"); try {
transactionRecord.setTransactionDate(dateFormat.parse(dateString)); } catch (ParseException e) {
e.printStackTrace(); } return transactionRecord; }}

CustomItemProcessor

自定义实现接口 ItemProcessor, 作为 ItemReaderItemWriter 的转换点。

package com.littlefxc.examples.batch.service;import com.littlefxc.examples.batch.model.TransactionRecord;import org.springframework.batch.item.file.mapping.FieldSetMapper;import org.springframework.batch.item.file.transform.FieldSet;import org.springframework.validation.BindException;import java.text.ParseException;import java.text.SimpleDateFormat;/** * 将读取到的数据集合转换为对象 * @author fengxuechao * @date 2019/1/4 **/public class RecordFieldSetMapper implements FieldSetMapper
{
public Transaction mapFieldSet(FieldSet fieldSet) throws BindException {
SimpleDateFormat dateFormat = new SimpleDateFormat("dd/MM/yyyy"); Transaction transactionRecord = new Transaction(); transactionRecord.setUsername(fieldSet.readString("username")); transactionRecord.setUserId(fieldSet.readInt("user_id")); transactionRecord.setAmount(fieldSet.readDouble("transaction_amount")); String dateString = fieldSet.readString("transaction_date"); try {
transactionRecord.setTransactionDate(dateFormat.parse(dateString)); } catch (ParseException e) {
e.printStackTrace(); } return transactionRecord; }}

模型

package com.littlefxc.examples.batch.model;import lombok.Data;import javax.xml.bind.annotation.XmlRootElement;import java.util.Date;/** * @author fengxuechao */@Data@XmlRootElement(name = "transactionRecord")public class Transaction {
private String username; private int userId; private Date transactionDate; private double amount;}

record.csv

devendra, 1234, 31/10/2015, 10000john    , 2134, 3/12/2015 , 12321robin   , 2134, 2/02/2015 , 23411

启动程序

package com.littlefxc.examples.batch;import org.springframework.batch.core.Job;import org.springframework.batch.core.JobExecution;import org.springframework.batch.core.JobParameters;import org.springframework.batch.core.launch.JobLauncher;import org.springframework.context.support.ClassPathXmlApplicationContext;public class App {
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(); context.setConfigLocations("classpath:spring-context.xml"); context.refresh(); JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher"); Job job = (Job) context.getBean("firstBatchJob"); System.out.println("Starting the batch job"); try {
JobExecution execution = jobLauncher.run(job, new JobParameters()); System.out.println("Job Status : " + execution.getStatus()); System.out.println("Job completed"); } catch (Exception e) {
e.printStackTrace(); System.out.println("Job failed"); } }}

验证

10000.0
2015-10-31T00:00:00+08:00
1234
devendra
12321.0
2015-12-03T00:00:00+08:00
2134
john
23411.0
2015-02-02T00:00:00+08:00
2134
robin

附录:

application.properties

batch.schema-drop=org/springframework/batch/core/schema-drop-mysql.sqlbatch.schema-create=org/springframework/batch/core/schema-mysql.sqlbatch.sql=INSERT INTO transaction_record (user_id, username, transaction_date, amount) VALUES (:userId, :username, :transactionDate, :amount)jdbc.url=jdbc:mysql://192.168.120.63:3306/batch?useSSL=falsejdbc.username=rootjdbc.password=123456jdbc.driver-class-name=com.mysql.jdbc.Driver# 自定义数据库删除脚本project.schema-drop=classpath:schema-drop.sql# 自定义数据库创建脚本project.schema-create=classpath:schema.sql# Mybatis Configmybatis.configuration=classpath:mybatis-config.xmlmybatis.type-aliases-package=com.littlefxc.examples.batch.modelmybatis.mapper.base-package=com.littlefxc.examples.batch.dao

mybatis-config.xml

schema.sql

/* Navicat Premium Data Transfer Source Server         : localhost Source Server Type    : MySQL Source Server Version : 50722 Source Host           : localhost:3306 Source Schema         : batch Target Server Type    : MySQL Target Server Version : 50722 File Encoding         : 65001 Date: 31/01/2019 10:27:20*/SET NAMES utf8mb4;SET FOREIGN_KEY_CHECKS = 0;-- ------------------------------ Table structure for transaction_record-- ----------------------------# DROP TABLE IF EXISTS `transaction_record`;CREATE TABLE `transaction_record`  (  `username` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,  `user_id` bigint(20) NOT NULL,  `transaction_date` datetime(6) NOT NULL,  `amount` double(11, 0) NOT NULL,  PRIMARY KEY (`username`) USING BTREE) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;SET FOREIGN_KEY_CHECKS = 1;

schema-drop.sql

DROP TABLE IF EXISTS `transaction_record`;

转载地址:http://gvxzb.baihongyu.com/

你可能感兴趣的文章
Redis学习笔记(四)—— redis的常用命令和五大数据类型的简单使用
查看>>
Win10+VS2015编译libcurl
查看>>
Windows下使用jsoncpp
查看>>
Ubuntu下测试使用Nginx+uWsgi+Django
查看>>
Windows下编译x264
查看>>
visual studio调试内存泄漏工具
查看>>
开源Faac实现PCM编码AAC
查看>>
Windows下wave API 音频采集
查看>>
借船过河:一个据说能看穿你的人性和欲望的心理测试
查看>>
AndroidStudio 导入三方库使用
查看>>
Ubuntu解决gcc编译报错/usr/bin/ld: cannot find -lstdc++
查看>>
解决Ubuntu14.04 - 16.10版本 cheese摄像头灯亮却黑屏问题
查看>>
解决Ubuntu 64bit下使用交叉编译链提示error while loading shared libraries: libz.so.1
查看>>
Android Studio color和font设置
查看>>
Python 格式化打印json数据(展开状态)
查看>>
Centos7 安装curl(openssl)和libxml2
查看>>
Centos7 离线安装RabbitMQ,并配置集群
查看>>
Centos7 or Other Linux RPM包查询下载
查看>>
运行springboot项目出现:Type javax.xml.bind.JAXBContext not present
查看>>
Java中多线程向mysql插入同一条数据冲突问题
查看>>