实现多数据量的导入数据库

引言

在做一个项目的时候,涉及到需要从一个表格中获取百万条数据然后插入到数据库中,最后采用JDBC的executeBantch方法实现这个功能。


采取的策略

  • 尽量关闭字段索引(因为再插入数据的时候还是需要维护索引的,在创建索引和维护索引 会耗费时间,随着数据量的增加而增加,可以在插入数据后再去为字段创建索引)

    虽然索引可以提高查询速度但是,插入数据的时候会导致索性的更新。索性越多,插入会越慢。可以看文档描述:
    Although it can be tempting to create an indexes for every possible column used in a query, unnecessary indexes waste space and waste time for MySQL to determine which indexes to use. Indexes also add to the cost of inserts, updates, and deletes because each index must be updated. You must find the right balance to achieve fast queries using the optimal set of indexes.

  • 分批次提交数据

  • 在分布式条件下,还可以考虑在不同的数据库结点提交,有底层的消息系统完成数据扩展

  • 过滤预处理数据

    预处理数据的场景:为了避免插入的数据(假设ListA)跟数据库中某些数据重复,那么我们会把要插入的数据去数据库中查询是否已经存在,获得返回的已经存在数据(ListB)。然后在插入数据的时候判断当前数据是否在ListB中,那么当前数据不能够插入数据库。过滤出来,最后得到一个可以插入数据库的ListC

代码

关键代码

/*数据分析结束*/
/*往数据库写数据开始*/
Connection conn=null;
PreparedStatement idsUserAdd=null;
try {
Class.forName("com.mysql.jdbc.Driver") ;
conn = DriverManager.getConnection(ConfigTool.getProperty("jdbc.url").toString() , ConfigTool.getProperty("jdbc.username").toString() , ConfigTool.getProperty("jdbc.password").toString());
conn.setAutoCommit(false);
//构造预处理statement
idsUserAdd = conn.prepareStatement("INSERT INTO dc_matedata ("+
" ID,`NAME`, DATATYPE,`CODE`,TYPE_ID,`LENGTH`, "+
" DATANAME, VALUEAREA,`RESTRICT`, REMARK,MD_DATE)"+
" values(?,?,?,?,?,?,?,?,?,?,now())");
//最大列表的数目当做循环次数
int xhcs=addMetadataList.size();//addMetadataList需要插入的数据
for(int i=0;i<xhcs;i++){
idsUserAdd.setString(1,addMetadataList.get(i).get("id").toString());
idsUserAdd.setString(2,addMetadataList.get(i).get("name").toString());
idsUserAdd.setString(3,addMetadataList.get(i).get("dataType").toString());
idsUserAdd.setString(4,addMetadataList.get(i).get("code").toString());
idsUserAdd.setString(5,addMetadataList.get(i).get("typeId").toString());
idsUserAdd.setString(6,addMetadataList.get(i).get("dataLength").toString());
idsUserAdd.setString(7,addMetadataList.get(i).get("dataName").toString());
idsUserAdd.setString(8,addMetadataList.get(i).get("valueArea").toString());
idsUserAdd.setString(9,addMetadataList.get(i).get("dataRestrict").toString());
idsUserAdd.setString(10,addMetadataList.get(i).get("dataRemark").toString());
idsUserAdd.addBatch();
//每10000次提交一次
if(i%10000==0||i==xhcs-1){//可以设置不同的大小;如50,100,500,1000等等 i==xhcs-1(最后一次)
idsUserAdd.executeBatch();
conn.commit();
idsUserAdd.clearBatch();
}
}
} catch (Exception e) {
e.printStackTrace();
throw e;
}finally {
try {
if(idsUserAdd!=null)
idsUserAdd.close();
if(conn!=null)
conn.close();
}catch(Exception e){
e.printStackTrace();
throw e;
}
}
/*往数据库写数据结束*/

完整代码

/**
* 校验需要导入的元数据信息,封装错误信息并批量插入数据库
*/
@Override
public List<Map<String, Object>> saveDCMetadataBatch(List<Map<String, Object>> list, boolean valid,
boolean addError) throws Exception{
List<Map<String,Object>> errorList=new ArrayList<Map<String,Object>>();//获得不能够添加成功的数据
Map<String,Object> map=new HashMap<String,Object>();//查询条件
Map<String,String> codeMap=new HashMap<String,String>();//每个分类对应的元数据的编号最大值
Map<String,Object> metaName=new HashMap<String,Object>();//查询数据库中是否存在相同的数据(这里校验的是:元数据的中文简称)
Map<String,Object> metaDataName=new HashMap<String,Object>();//查询数据库中是否存在相同的数据(这里校验的是:元数据的数据项名称)
map.put("metaName",list);//需要查询的元数据中文名称
map.put("metaDataTypeId",list);//导入的元数据的编号
List<Map<String, Object>> metaExistList = dCMatedataDao.getDCMetadata(map);//根据元数据名称查询当前分类下是否存在同样元数据
map.put("metaName",null);//置空
map.put("metaDataName",list);
List<Map<String, Object>> metaExistListTwo = dCMatedataDao.getDCMetadata(map);//根据元数据数据项名称查询存在的元数据
//保存重复的信息
for(int i=0;i<metaExistList.size();i++)
metaName.put(metaExistList.get(i).get("name").toString()+metaExistList.get(i).get("code").toString()
,metaExistList.get(i).get("id"));//添加父类的编号为后缀-唯一性保证
for(int i=0;i<metaExistListTwo.size();i++)
metaDataName.put(metaExistListTwo.get(i).get("dataname").toString()+metaExistListTwo.get(i).get("code").toString(),
metaExistListTwo.get(i).get("id"));
/*整理出来的数据-开始*/
List<Map<String,Object>> addMetadataList=new ArrayList<Map<String,Object>>();
/*整理出来的数据-结束*/
for (int i = 0; i < list.size(); i++) {
Map<String, Object> MetadataObj = list.get(i);
try {
String metadatId = StringUtil.getUUID();//元数据id
/*校验开始*/
if (valid){
if(validUser(MetadataObj,"name",addError)!=null){//验证输入的数据是否符合格式和必填。
errorList.add(MetadataObj);
continue;
}
}
/*前端校验结束*/
/*校验是否存在同名的元数据*/
String dataCodeCheck = MetadataObj.get("dataCode").toString().trim(); //元数据父分类编号
String name = MetadataObj.get("name").toString().trim();//元数据中文简称
if (metaName.containsKey(name+dataCodeCheck)) {
if (addError) {
MetadataObj.put("errInfo", "中文简称已存在");
}
errorList.add(MetadataObj);
continue;
}
/*校验是否存在相同数据项的元数据*/
String dataName = MetadataObj.get("dataName").toString().trim();//数据项名
if (metaDataName.containsKey(dataName+dataCodeCheck)) {
if (addError) {
MetadataObj.put("errInfo", "数据项名已存在");
}
errorList.add(MetadataObj);
continue;
}
String dataCode = MetadataObj.get("dataCode").toString().trim(); //元数据父分类编号
List<Map<String, Object>> footCount = dCMatedataDao.getFootCount(dataCode);
if( footCount.size() > 0){
if (addError) {
MetadataObj.put("errInfo", "分类编码不是最后一级分类");
}
errorList.add(MetadataObj);
continue;
}
Map<String, Object> typeByCode = dCMatedataDao.getMetadataTypeByCode(dataCode);
if( typeByCode == null || typeByCode.size() < 1){
if (addError) {
MetadataObj.put("errInfo", "分类编码不存在,请先添加分类");
}
errorList.add(MetadataObj);
continue;
}
//校验是在添加的List中是否存在相同的数据项名或者中文简称
//校验导入文件中是否存在一样的中文简称或者数据项名
boolean nameExist = false;
boolean dataNameExist = false;
for (int j = 0; j < addMetadataList.size(); j++){
Map<String, Object> map2 = addMetadataList.get(j);
String typeId = map2.get("typeId").toString();
String nameE = map2.get("name").toString();
String dataNameE = map2.get("dataName").toString();
if( typeId.equals(typeByCode.get("id").toString()) && nameE.equals(name)){
nameExist=true;
break;
}
if( typeId.equals(typeByCode.get("id").toString()) && dataNameE.equals(dataName)){
dataNameExist=true;
break;
}
}
if( nameExist ){
if (addError) {
MetadataObj.put("errInfo", "中文简称已存在");
}
errorList.add(MetadataObj);
continue;
}
if( dataNameExist ){
if (addError) {
MetadataObj.put("errInfo", "数据项名已存在");
}
errorList.add(MetadataObj);
continue;
}
//进入这里说明校验结束,开始填充添加的数据
String type_id = typeByCode.get("id").toString();//元数据所属分类id
String dataType = MetadataObj.get("dataType").toString().trim(); //元数据类型
String dataLength = MetadataObj.get("dataLength").toString().trim(); //元数据长度
String code = "";
////
if( codeMap.get(dataCode) == null||StringUtil.isEmpty(codeMap.get(dataCode)) ){//表示当前分类不存在已经添加的元数据--因为编码map中不存在对应分类的最大编码
Map maxCodeByPid = this.selectMetadataMaxCode(type_id);
if( maxCodeByPid == null ){//表示当前分类下不存在任何子分类
code = StringUtil.getCode("0", dataCode);//则从01开始编号
codeMap.put(dataCode, "01");//保存当前分类下元数据编号最大值
}else{
String object = (String) maxCodeByPid.get("codeNum");//当前分类节点下的元数据的编号最大值。
int pSituation = object.indexOf(dataCode);
int pLength = pSituation+dataCode.length() ;
String substring = object.substring(pLength); //截取出最大编号值得最大值
code = StringUtil.getCode(substring, dataCode);
int temp = Integer.parseInt(substring);//保存当前分类下元数据编号最大值
temp+=1;
codeMap.put(dataCode, temp+"");
}
}else{
String maxCode = codeMap.get(dataCode);
code = StringUtil.getCode(maxCode, dataCode);
//保存当前分类下元数据编号最大值
int temp = Integer.parseInt(maxCode);
temp+=1;
codeMap.put(dataCode, temp+"");
}
///
Map<String, Object> metadatList = new LinkedHashMap<String, Object>();
metadatList.put("id", metadatId);
metadatList.put("name",name);
metadatList.put("dataType",dataType);
metadatList.put("code",code);
metadatList.put("typeId",type_id);
metadatList.put("dataLength",dataLength);
metadatList.put("dataName",dataName);
metadatList.put("valueArea", MetadataObj.get("valueArea")==null?"":MetadataObj.get("valueArea") );
metadatList.put("dataRestrict",MetadataObj.get("dataRestrict")==null?"":MetadataObj.get("dataRestrict"));
metadatList.put("dataRemark",MetadataObj.get("dataRemark")==null?"":MetadataObj.get("dataRemark"));
metadatList.put("mdDate",new Date());
addMetadataList.add(metadatList);
}
catch (Exception e){
if(addError) {
MetadataObj.put("errInfo", e.getMessage());
}
errorList.add(MetadataObj);
}
}
/*数据分析结束*/
/*往数据库写数据开始*/
Connection conn=null;
PreparedStatement idsUserAdd=null;
try {
Class.forName("com.mysql.jdbc.Driver") ;
conn = DriverManager.getConnection(ConfigTool.getProperty("jdbc.url").toString() , ConfigTool.getProperty("jdbc.username").toString() , ConfigTool.getProperty("jdbc.password").toString());
conn.setAutoCommit(false);
//构造预处理statement
idsUserAdd = conn.prepareStatement("INSERT INTO dc_matedata ("+
" ID,`NAME`, DATATYPE,`CODE`,TYPE_ID,`LENGTH`, "+
" DATANAME, VALUEAREA,`RESTRICT`, REMARK,MD_DATE)"+
" values(?,?,?,?,?,?,?,?,?,?,now())");
//最大列表的数目当做循环次数
int xhcs=addMetadataList.size();
for(int i=0;i<xhcs;i++){
idsUserAdd.setString(1,addMetadataList.get(i).get("id").toString());
idsUserAdd.setString(2,addMetadataList.get(i).get("name").toString());
idsUserAdd.setString(3,addMetadataList.get(i).get("dataType").toString());
idsUserAdd.setString(4,addMetadataList.get(i).get("code").toString());
idsUserAdd.setString(5,addMetadataList.get(i).get("typeId").toString());
idsUserAdd.setString(6,addMetadataList.get(i).get("dataLength").toString());
idsUserAdd.setString(7,addMetadataList.get(i).get("dataName").toString());
idsUserAdd.setString(8,addMetadataList.get(i).get("valueArea").toString());
idsUserAdd.setString(9,addMetadataList.get(i).get("dataRestrict").toString());
idsUserAdd.setString(10,addMetadataList.get(i).get("dataRemark").toString());
idsUserAdd.addBatch();
//每10000次提交一次
if(i%10000==0||i==xhcs-1){//可以设置不同的大小;如50,100,500,1000等等
idsUserAdd.executeBatch();
conn.commit();
idsUserAdd.clearBatch();
}
}
} catch (Exception e) {
e.printStackTrace();
throw e;
}finally {
try {
if(idsUserAdd!=null)
idsUserAdd.close();
if(conn!=null)
conn.close();
}catch(Exception e){
e.printStackTrace();
throw e;
}
}
/*往数据库写数据结束*/
return errorList;
}

总结

有些网友发现使用StringBuffer 来拼接入参,不通过prepareStatement的预处理,虽然前者速度很快,但是使用prepareStatement可以防止SQL注入


有的好的建议大家都可以提出来

如果你感觉文章对你又些许感悟,你可以支持我!!
-------------本文结束感谢您的阅读-------------