论坛首页 Java企业应用论坛

jdbc 多线程公用一个connection PreparedStatement

浏览 23597 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (1) :: 隐藏帖 (0)
作者 正文
   发表时间:2011-04-07   最后修改:2011-04-07

 


 最近做一个数据下载接口,功能是这样的:拿n个基金的id,轮训接口下载每一个基金id的详细回来录入自己的数据库。n>10000, 所以采用多线程下载入库。ok。。问题来了。当我采用200根线程同时做这个事情的时候。 调用接口没有问题,在入库的时候connection的创建我有两种想法:

第一:200根线程公用一个connection和PreparedStatement。因为我觉得200根线程的入库操作都是一样的。sql语句都一样。 唯一不同的是给sql参数赋值的过程。(也许会有人说。是否会有线程安全的问题。实践证明并没有引发异常。。。这也是我的一个疑问。 求解。。。。),还有一方面的考虑是,频繁的打开,关闭连接也是会造成了一定的内存消耗。

第二:用线程池管理connection,每根线程拥有自己的connection。(这个不用说,大部分首先都会考虑用线程池,但这里也会有一个问题。当启动200根线程,而连接池中供我的连接假设只有50个,那么也就是意味着有150根线程处于等的状态,这就照成整个接口全部执行完消耗的时间增大。)

 

希望有人能为我解答上面两种方案的优劣。!

代码是这样的。

 

public class DownloadTask implements Runnable {
	
	private final Connection con;
	

	public DownloadTask(Connection con) {
	      this.con = con;
	
	}
public void run() {
		
//		Long downloadLong = System.currentTimeMillis();
		List<MonitorEntity> monitorEntitys = new ArrayList<MonitorEntity>();
//调用接口下载详细
		for (String id : idsList) {
			MonitorEntity entity = getMonitorEntity(id);
			monitorEntitys.add(entity);
		}
//		System.out.println("i ="+i+" download api over "+(System.currentTimeMillis()-downloadLong)/1000+" s");
		Long timeLong = System.currentTimeMillis();
	                Connection connection = con;
		PreparedStatement ps = null;
		try {
//			connection.setAutoCommit(false);
			ps = connection.prepareStatement(sqlString);
			for (MonitorEntity entity  : monitorEntitys) {	
			
					ps.setString(1, entity.getApi_name());
					ps.setString(2, entity.getId());
					ps.setString(3, entity.getId_type());
					ps.setString(4, entity.getRequest_url());
					ps.setInt(5, entity.getHttp_code());
					ps.setInt(6, entity.getRes_code());
					ps.setInt(7, entity.getRes_time());
					ps.setString(8, entity.getRes_msg());
					ps.setString(9, entity.getResult());
					ps.setString(10, entity.getLast_update_time());
					ps.setString(11, entity.getRequest_url());
					ps.setInt(12, entity.getHttp_code());
					ps.setInt(13, entity.getRes_code());
					ps.setInt(14, entity.getRes_time());
					ps.setString(15, entity.getRes_msg());
					ps.setString(16, entity.getResult());
					ps.setString(17, entity.getLast_update_time());
					ps.addBatch();
			}
			ps.executeBatch();
//			connection.commit(); 
		} catch (Exception e) {
			log.error("insert or update database error", e);
		} finally {
			jdbcUtil.closePreparedStatement(ps);
					}
		System.out.println("i ="+i+" insertOrUpdate database over "+(System.currentTimeMillis()-timeLong)/1000+" s");
		countdown.countDown();
	}

 

 

一个类实现Runnable接口,在这个这个类构造函数中增加一个参数Connection con,然后再主线程 new 子线程的时候吧Connection 作为参数传递进来。

 这里只是公用一个connection, 效率很慢,一部分线程只用不到1s钟就执行完了。 但越到后面的线程执行的时间却越长,我个人认为是一个connection只能new出一定的PreparedStatement ,所以导致后面的线程必须等前面的线程执行完,释放掉PreparedStatement ,才能创建新的PreparedStatement 。不知道这样认为是否正确。

后来我想,如果是这样干脆把PreparedStatement 作为参数传递进来。是否能快呢?

代码如下:

 

 

public class DownloadTask implements Runnable {
	static Log log = LogFactory.getLog(DownloadTask.class);
	private final JDBCUtil jdbcUtil;
	private final Connection con;
	private final List<String> idsList;
	private final String url_head;
	private final String url_param;
	private final String sqlString;
	private final String api_name;
	private final String id_type;
	CountDownLatch countdown;
	private final PreparedStatement ps;
	private final int i;
	static SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");

	public DownloadTask(JDBCUtil jdbcUtil, Connection con,
			List<String> idsList, String url_head, String url_param,
			String sqlString, String api_name, String id_type,CountDownLatch countdown,int i,PreparedStatement ps) {
		this.jdbcUtil = jdbcUtil;
		this.con = con;
		this.idsList = idsList;
		this.url_head = url_head;
		this.url_param = url_param;
		this.sqlString = sqlString;
		this.api_name = api_name;
		this.id_type = id_type;
		this.countdown = countdown;
		this.i = i ;
		this.ps = ps;
	}

	private MonitorEntity getMonitorEntity(String id) {
		MonitorEntity entity = new MonitorEntity();
		String url = url_head + id + url_param;
		Long startTimeLong = System.currentTimeMillis();
		String[] responseString;
		try {
			responseString = DownloadData.download(url);
		} catch (Exception e) {
			log.error("download error url="+url,e);
			return null;
		}
		Long endTimeLong = System.currentTimeMillis();
		Long restimeLong = endTimeLong - startTimeLong;
		entity.setId(id);
		entity.setId_type(id_type);
		entity.setApi_name(api_name);
		entity.setRequest_url(url);
		entity.setHttp_code(Integer.parseInt(responseString[0]));
		entity.setRes_time(restimeLong.intValue());
		String resultString = responseString[1];
		try {
			
			JSONObject jsonObject = JSONObject.fromObject(resultString);

			JSONObject statusJsonObject = jsonObject.getJSONObject("status");
			entity.setRes_code(statusJsonObject.getInt("code"));
			entity.setRes_msg(statusJsonObject.getString("message"));
			entity.setResult(jsonObject.getJSONObject("data").toString());
		} catch (Exception e) {
			log.error("analysis jsonobject error ", e);
			entity.setRes_code(-1);
			entity.setRes_msg("analysis jsonobject error");
			entity.setResult("error");
		}
		
		
		entity.setLast_update_time(sf.format(new Date()));
		return entity;
	}

	public void run() {
		Long timeLong = System.currentTimeMillis();
//		Long downloadLong = System.currentTimeMillis();
		List<MonitorEntity> monitorEntitys = new ArrayList<MonitorEntity>();
		for (String id : idsList) {
			MonitorEntity entity = getMonitorEntity(id);
			monitorEntitys.add(entity);
		}
//		System.out.println("i ="+i+" download api over "+(System.currentTimeMillis()-downloadLong)/1000+" s");
		
	  //  Connection connection = jdbcUtil.getConnection();
		//PreparedStatement ps = null;
		try {
//			connection.setAutoCommit(false);
			//ps = connection.prepareStatement(sqlString);
			for (MonitorEntity entity  : monitorEntitys) {	
			
					ps.setString(1, entity.getApi_name());
					ps.setString(2, entity.getId());
					ps.setString(3, entity.getId_type());
					ps.setString(4, entity.getRequest_url());
					ps.setInt(5, entity.getHttp_code());
					ps.setInt(6, entity.getRes_code());
					ps.setInt(7, entity.getRes_time());
					ps.setString(8, entity.getRes_msg());
					ps.setString(9, entity.getResult());
					ps.setString(10, entity.getLast_update_time());
					ps.setString(11, entity.getRequest_url());
					ps.setInt(12, entity.getHttp_code());
					ps.setInt(13, entity.getRes_code());
					ps.setInt(14, entity.getRes_time());
					ps.setString(15, entity.getRes_msg());
					ps.setString(16, entity.getResult());
					ps.setString(17, entity.getLast_update_time());
					ps.addBatch();
			}
			ps.executeBatch();
//			connection.commit(); 
		} catch (Exception e) {
			log.error("insert or update database error", e);
		} finally {
			//jdbcUtil.closePreparedStatement(ps);//由主线程控制关闭
			//jdbcUtil.closeConnection(connection);//由主线程控制关闭
		}
		System.out.println("i ="+i+" insertOrUpdate database over "+(System.currentTimeMillis()-timeLong)/1000+" s");
		countdown.countDown();
	}
}

 

 

测试结果是入库的时间快了一倍,每一个线程的入库时间基本相同。(如果用多线程一部分线程回慢,因为要等需要的connection)。。

求解释。。。 多线程下公用一个PreparedStatement ,是否回引发线程安全的问题。理论上上面应该会引发才对。但运行结果。。却没有。。。 还有一个隐形问题。。 多线程公用一个PreparedStatement  ,在赋值时是否会有问题。比如:A线程吧值赋到B线程去了。 反正异常是没有报。

 

主线程代码如下:

package com.morningstar.api;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import com.morningstar.api.ApiConfig;
import com.morningstar.api.util.JDBCParameterCallBack;
import com.morningstar.api.util.JDBCUtil;

public class Application {
	static Log log = LogFactory.getLog(Application.class);
	private static String sqlString = "(api_name,id,id_type,request_url,http_code,state_code,response_time,state_msg,result,last_update) "
			+ "value(?,?,?,?,?,?,?,?,?,?)"
			+ " on duplicate key update request_url=?,http_code=?,state_code=?,response_time=?,"
			+ "state_msg=?,result=?,last_update=?";


	public static void main(String[] args) throws Exception {
		ApiConfig apiConfig = new ApiConfig();

		
		String url_param = "?username=" + apiConfig.getUsername()
				+ "&password=" + apiConfig.getPassword() + "&format=json";
		//获取ids集合
		Map<String, List<String>> idTypeMap = apiConfig.getIdsMap();
		//任务数
		int taskCount = apiConfig.getTask_count();

		JDBCUtil jdbcUtil = new JDBCUtil(apiConfig.getJdbc_url(),
				apiConfig.getJdbc_name(), apiConfig.getJdbc_pass(),
				apiConfig.getJdbc_driver(), apiConfig.getMax_conn(),
				apiConfig.getMin_conn());
		Connection con = null;
		//API名字集合
		List<String> apiNames = apiConfig.getApi_names();
		//获取创建表sql
		String createTableSql = apiConfig.getCreateTableSql();
		Connection connection = jdbcUtil.getConnection();
		for (String apiname : apiNames) {
			String url_head = apiConfig.getApi_url() + apiname + "/";
			
			jdbcUtil.insertOrUpdate("CREATE TABLE IF NOT EXISTS t_monitor_"+apiname+createTableSql);
			String sql = "insert t_monitor_"+apiname+sqlString;
			PreparedStatement ps = null;
			ps = connection.prepareStatement(sql);
			for (String idtype : idTypeMap.keySet()) {
				Long starttimeLong = System.currentTimeMillis();
				List<String> idsList = idTypeMap.get(idtype);
				log.info("start download  "+apiname+" " + idtype + " list size:"
						+ idsList.size());
				int number = idsList.size() / taskCount;
				int rem = idsList.size() % taskCount;
				if (rem != 0) {
					taskCount++;
				}
				CountDownLatch countdown = new CountDownLatch(taskCount);
				for (int i = 0; i < taskCount; i++) {

					int startIndex = i * number;
					int endIndex = (i + 1) * number > idsList.size() ? idsList
							.size() : (i + 1) * number;
					List<String> ids = idsList.subList(startIndex, endIndex);
					
					DownloadTask downloadTask = new DownloadTask(jdbcUtil, con,
							ids, url_head + idtype + "/", url_param, sql,
							apiname, idtype, countdown, i,ps);
					Thread thread = new Thread(downloadTask);
					thread.start();
				}
				try {
					long timeout = 60;
					countdown.await(timeout, TimeUnit.MINUTES);
				} catch (InterruptedException e) {
					log.error("thread is interrupted", e);
				}
				Long endtimeLong = System.currentTimeMillis();
				log.info("end download  "+apiname+" "  + idtype + " list size:" + idsList.size()
						+ " over time:" + (endtimeLong - starttimeLong) / 1000
						+ "s");

			}
			jdbcUtil.closePreparedStatement(ps);
		}
		jdbcUtil.closeConnection(connection);
	}
}

 

   发表时间:2011-04-07  
你给出操作DownloadTask的代码
0 请登录后投票
   发表时间:2011-04-07  
kanny87929 写道
你给出操作DownloadTask的代码

?????上面就DownloadTask的代码呀。  主线程代码如下:
package com.morningstar.api;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import com.morningstar.api.ApiConfig;
import com.morningstar.api.util.JDBCParameterCallBack;
import com.morningstar.api.util.JDBCUtil;

public class Application {
static Log log = LogFactory.getLog(Application.class);
private static String sqlString = "(api_name,id,id_type,request_url,http_code,state_code,response_time,state_msg,result,last_update) "
+ "value(?,?,?,?,?,?,?,?,?,?)"
+ " on duplicate key update request_url=?,http_code=?,state_code=?,response_time=?,"
+ "state_msg=?,result=?,last_update=?";


public static void main(String[] args) throws Exception {
ApiConfig apiConfig = new ApiConfig();


String url_param = "?username=" + apiConfig.getUsername()
+ "&password=" + apiConfig.getPassword() + "&format=json";
//获取ids集合
Map<String, List<String>> idTypeMap = apiConfig.getIdsMap();
//任务数
int taskCount = apiConfig.getTask_count();

JDBCUtil jdbcUtil = new JDBCUtil(apiConfig.getJdbc_url(),
apiConfig.getJdbc_name(), apiConfig.getJdbc_pass(),
apiConfig.getJdbc_driver(), apiConfig.getMax_conn(),
apiConfig.getMin_conn());
Connection con = null;
//API名字集合
List<String> apiNames = apiConfig.getApi_names();
//获取创建表sql
String createTableSql = apiConfig.getCreateTableSql();
Connection connection = jdbcUtil.getConnection();
for (String apiname : apiNames) {
String url_head = apiConfig.getApi_url() + apiname + "/";

jdbcUtil.insertOrUpdate("CREATE TABLE IF NOT EXISTS t_monitor_"+apiname+createTableSql);
String sql = "insert t_monitor_"+apiname+sqlString;
PreparedStatement ps = null;
ps = connection.prepareStatement(sql);
for (String idtype : idTypeMap.keySet()) {
Long starttimeLong = System.currentTimeMillis();
List<String> idsList = idTypeMap.get(idtype);
log.info("start download  "+apiname+" " + idtype + " list size:"
+ idsList.size());
int number = idsList.size() / taskCount;
int rem = idsList.size() % taskCount;
if (rem != 0) {
taskCount++;
}
CountDownLatch countdown = new CountDownLatch(taskCount);
for (int i = 0; i < taskCount; i++) {

int startIndex = i * number;
int endIndex = (i + 1) * number > idsList.size() ? idsList
.size() : (i + 1) * number;
List<String> ids = idsList.subList(startIndex, endIndex);

DownloadTask downloadTask = new DownloadTask(jdbcUtil, con,
ids, url_head + idtype + "/", url_param, sql,
apiname, idtype, countdown, i,ps);
Thread thread = new Thread(downloadTask);
thread.start();
}
try {
long timeout = 60;
countdown.await(timeout, TimeUnit.MINUTES);
} catch (InterruptedException e) {
log.error("thread is interrupted", e);
}
Long endtimeLong = System.currentTimeMillis();
log.info("end download  "+apiname+" "  + idtype + " list size:" + idsList.size()
+ " over time:" + (endtimeLong - starttimeLong) / 1000
+ "s");

}
jdbcUtil.closePreparedStatement(ps);
}
jdbcUtil.closeConnection(connection);
}
}
0 请登录后投票
   发表时间:2011-04-07  
//任务数  
34.        int taskCount = apiConfig.getTask_count();  
这个是什么我不是很明白
0 请登录后投票
   发表时间:2011-04-07   最后修改:2011-04-07
引用

 Connection connection = jdbcUtil.getConnection();   
46.        for (String apiname : apiNames) {   
47.            String url_head = apiConfig.getApi_url() + apiname + "/";   
48.               
49.            jdbcUtil.insertOrUpdate("CREATE TABLE IF NOT EXISTS t_monitor_"+apiname+createTableSql);   
50.            String sql = "insert t_monitor_"+apiname+sqlString;   
51.            PreparedStatement ps = null;   
52.            ps = connection.prepareStatement(sql);   
53.            for (String idtype : idTypeMap.keySet()) {   
54.                Long starttimeLong = System.currentTimeMillis();   
55.                List<String> idsList = idTypeMap.get(idtype);   
56.                log.info("start download  "+apiname+" " + idtype + " list size:"  
57.                        + idsList.size());   
58.                int number = idsList.size() / taskCount;   
59.                int rem = idsList.size() % taskCount;   
60.                if (rem != 0) {   
61.                    taskCount++;   
62.                }   
63.                CountDownLatch countdown = new CountDownLatch(taskCount);   
64.                for (int i = 0; i < taskCount; i++) {   
65.  
66.                    int startIndex = i * number;   
67.                    int endIndex = (i + 1) * number > idsList.size() ? idsList   
68.                            .size() : (i + 1) * number;   
69.                    List<String> ids = idsList.subList(startIndex, endIndex);   
70.                       
71.                    DownloadTask downloadTask = new DownloadTask(jdbcUtil, con,   
72.                            ids, url_head + idtype + "/", url_param, sql,   
73.                            apiname, idtype, countdown, i,ps);   
74.                    Thread thread = new Thread(downloadTask);   
75.                    thread.start();   
76.                }   
77.                try {   
78.                    long timeout = 60;   
79.                    countdown.await(timeout, TimeUnit.MINUTES);   
80.                } catch (InterruptedException e) {   
81.                    log.error("thread is interrupted", e);   
82.                }   
83.                Long endtimeLong = System.currentTimeMillis();   
84.                log.info("end download  "+apiname+" "  + idtype + " list size:" + idsList.size()   
85.                        + " over time:" + (endtimeLong - starttimeLong) / 1000  
86.                        + "s");   
87.  
88.            }   
89.            jdbcUtil.closePreparedStatement(ps);   
90.        }   
91.        jdbcUtil.closeConnection(connection);  





楼主写的代码让人看着蛋痛

1。所有的基金实体都有一个id
2。而且基金实体个数超过1w个
3。id还分类型,也就是基金实体分类型,同一个类型的id在一个id集合里
4。所有id集合又存放在一个idMap中

我看你的代码是,用了第一种方案
有一个apiName的集合,而且你只打开了一个连接。
每一个apiName创建一个表,每个表都要插入所有的基金实体
这样你的代码写的有问题
如果一个id在所有类型的id集合中只出现一次,那么就是说,你要下载
apiNames.size次这个实体,有问题。


0 请登录后投票
   发表时间:2011-04-07  
下载的东西放到一个Queue里面,做个后台线程定时刷新到数据库中。每次刷新时,Queue有多少记录,就一次性写入数据库中。

InsertQueueService这种概念:[http://code.google.com/p/guzz/wiki/AppendCoreService?wl=zh-Hans#5._数据库插入队列服务:]

0 请登录后投票
   发表时间:2011-04-07  
kanny87929 写道
//任务数  
34.        int taskCount = apiConfig.getTask_count();  
这个是什么我不是很明白

这个线程数,在配置文件中出现的。线程数是动态的。。。。
0 请登录后投票
   发表时间:2011-04-07  
我一个一个解释哈。。 首先api的个数是动态的。 也就是说。api的名字会增加。我把api配置在配置文件中。
一个api对应一个表。 所以就出现第一个for循环。 先创建表。  循环api
2.基金id和基金类型。 a类型下有1w的id b类型下也有n个。 所以就以map储存。 类型做键,idlist做值。
  出现第二个for循环。 循环list。
3.取出每一个类型的idlist后,根据配置文件中配置的线程数,分配每一根线程控制当前idlist的个数。
4.上面这些都不重点。  只是业务逻辑问题。
  
  我的问题是。。多线程下。。 公用一个connection,PreparedStatement 有没有问题。
     理论上应该是有问题的。 但现在没有任何异常。 而且比采用连接池更快。快了一倍的时间。
用连接池:一部分线程用了1s。 一部分线程用了10s 越到后面的线程越慢。(这好说。 因为连接池的连接不够用,后面的线程需等前面的线程先释放掉。所以后面的线程慢一些。)

可我公用一个
PreparedStatement  , 就出现把PreparedStatement 做为参数传递进去。结果是:每一根线程执行的时间相等了。 也就是说。。真正达到了并行的效果。



kanny87929 写道
引用

 Connection connection = jdbcUtil.getConnection();   
46.        for (String apiname : apiNames) {   
47.            String url_head = apiConfig.getApi_url() + apiname + "/";   
48.               
49.            jdbcUtil.insertOrUpdate("CREATE TABLE IF NOT EXISTS t_monitor_"+apiname+createTableSql);   
50.            String sql = "insert t_monitor_"+apiname+sqlString;   
51.            PreparedStatement ps = null;   
52.            ps = connection.prepareStatement(sql);   
53.            for (String idtype : idTypeMap.keySet()) {   
54.                Long starttimeLong = System.currentTimeMillis();   
55.                List<String> idsList = idTypeMap.get(idtype);   
56.                log.info("start download  "+apiname+" " + idtype + " list size:"  
57.                        + idsList.size());   
58.                int number = idsList.size() / taskCount;   
59.                int rem = idsList.size() % taskCount;   
60.                if (rem != 0) {   
61.                    taskCount++;   
62.                }   
63.                CountDownLatch countdown = new CountDownLatch(taskCount);   
64.                for (int i = 0; i < taskCount; i++) {   
65.  
66.                    int startIndex = i * number;   
67.                    int endIndex = (i + 1) * number > idsList.size() ? idsList   
68.                            .size() : (i + 1) * number;   
69.                    List<String> ids = idsList.subList(startIndex, endIndex);   
70.                       
71.                    DownloadTask downloadTask = new DownloadTask(jdbcUtil, con,   
72.                            ids, url_head + idtype + "/", url_param, sql,   
73.                            apiname, idtype, countdown, i,ps);   
74.                    Thread thread = new Thread(downloadTask);   
75.                    thread.start();   
76.                }   
77.                try {   
78.                    long timeout = 60;   
79.                    countdown.await(timeout, TimeUnit.MINUTES);   
80.                } catch (InterruptedException e) {   
81.                    log.error("thread is interrupted", e);   
82.                }   
83.                Long endtimeLong = System.currentTimeMillis();   
84.                log.info("end download  "+apiname+" "  + idtype + " list size:" + idsList.size()   
85.                        + " over time:" + (endtimeLong - starttimeLong) / 1000  
86.                        + "s");   
87.  
88.            }   
89.            jdbcUtil.closePreparedStatement(ps);   
90.        }   
91.        jdbcUtil.closeConnection(connection);  





楼主写的代码让人看着蛋痛

1。所有的基金实体都有一个id
2。而且基金实体个数超过1w个
3。id还分类型,也就是基金实体分类型,同一个类型的id在一个id集合里
4。所有id集合又存放在一个idMap中

我看你的代码是,用了第一种方案
有一个apiName的集合,而且你只打开了一个连接。
每一个apiName创建一个表,每个表都要插入所有的基金实体
这样你的代码写的有问题
如果一个id在所有类型的id集合中只出现一次,那么就是说,你要下载
apiNames.size次这个实体,有问题。



0 请登录后投票
   发表时间:2011-04-07  
共用connection,
你试试auto commit=false,丢几个大事务(latency)或嵌套事务进去,多线程下,你就能蛋疼了。

0 请登录后投票
   发表时间:2011-04-07  
试想一下,
同一个connection/statement,Thread1 prepared啊,batch啊,insert啊,commit的时候,Thread2 rollback first,....
0 请登录后投票
论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics