赞
踩
1、程序连接数据库,使用c3p0线程池;
2、程序使用线程池,多线程编程;
3、采用Fork/Join框架线程池(工作窃取(work-stealing)算法),更高效的多线程编程算法。
直接贴代码,代码举例中,近用小规模数据模拟大数据下的数据库批量插入操作。
1、数据库连接池
- package com.example.jdbcConnection;
-
- import com.mchange.v2.c3p0.ComboPooledDataSource;
-
- import java.sql.Connection;
- import java.sql.PreparedStatement;
- import java.sql.ResultSet;
- import java.sql.SQLException;
- import java.util.ArrayList;
- import java.util.List;
-
- /**
- * Created by Liuxd on 2018/8/19.
- */
- public class TestC3p0 {
-
- private static Connection conn;
- private static ComboPooledDataSource dataSource;
-
- static {
- try {
- //获得c3p0连接池对象
- dataSource = new ComboPooledDataSource();
-
- dataSource.setUser("root");
- dataSource.setPassword("root");
- dataSource.setJdbcUrl("jdbc:mysql://127.0.0.1:3306/foo?autoReconnect=true&useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2b8&useSSL=false");
- dataSource.setDriverClass("com.mysql.jdbc.Driver");
- dataSource.setInitialPoolSize(2);//初始化池大小
- dataSource.setMaxIdleTime(30);//最大空闲时间
- dataSource.setMaxPoolSize(20);//最多连接数
- dataSource.setMinPoolSize(2);//最少连接数
- dataSource.setMaxStatements(50);//每次最多可以执行多少个批处理语句
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
-
- /**
- * 查询
- */
- private static List<Object[]> query() {
-
- List<Object[]> list = new ArrayList<Object[]>();
- try {
-
- // 获取数据库连接
- conn = dataSource.getConnection();
- // 查询sql
- String sql = "select * from user";
-
-
- // 读取数据
- PreparedStatement preparedStatement = conn.prepareStatement(sql);
- //结果集
- ResultSet resultSet = preparedStatement.executeQuery();
-
- while (resultSet.next()) {
- int uid = resultSet.getInt("uid");
- String name = resultSet.getString("name");
- Integer age = resultSet.getInt("age");
- String phone = resultSet.getString("phone");
- String passwd = resultSet.getString("passwd");
-
- Object[] objects = new Object[]{uid, name, age, phone, passwd};
-
- list.add(objects);
- }
-
- resultSet.close();
- preparedStatement.close();
- //Connection连接对象归还数据库连接池
- conn.close();
-
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- return list;
- }
-
-
- /**
- * 新增
- */
- public static void add(String name, int age, String phone, String passwd) {
-
- try {
-
- // 获取数据库连接
- conn = dataSource.getConnection();
-
- String insertSql = "insert into `user` (`name`, `age`, `phone`, `passwd`) values(?,?,?,?)";
-
- PreparedStatement ps = conn.prepareStatement(insertSql);
- ps.setString(1, name);
- ps.setInt(2, age);
- ps.setString(3, phone);
- ps.setString(4, passwd);
- int row = ps.executeUpdate();
-
- System.out.println("新增结果: " + row);
-
-
- ps.close();
- //Connection连接对象归还数据库连接池
- conn.close();
-
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- }
-
- /**
- * 修改
- */
- private static void update(int uid, String name, int age, String phone, String passwd) {
-
- try {
-
- // 获取数据库连接
- conn = dataSource.getConnection();
-
- String updateSql = "UPDATE USER t SET t.name=? ,t.age=?,t.phone=?,t.passwd=? WHERE t.uid=?";
-
- PreparedStatement preparedStatement = conn.prepareStatement(updateSql);
- preparedStatement.setString(1, name);
- preparedStatement.setInt(2, age);
- preparedStatement.setString(3, phone);
- preparedStatement.setString(4, passwd);
- preparedStatement.setLong(5, uid);
- // 执行sql
- preparedStatement.executeUpdate();
-
- int row = preparedStatement.executeUpdate();
-
- System.out.println("修改结果: " + row);
-
- //Connection连接对象归还数据库连接池
- conn.close();
- preparedStatement.close();
-
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- }
-
-
- /**
- * 删除
- */
- private static void deleteById(int uid) {
-
- try {
-
- // 获取数据库连接
- conn = dataSource.getConnection();
-
- String sql = "delete from USER where uid=?";
-
- PreparedStatement preparedStatement = conn.prepareStatement(sql);
- preparedStatement.setInt(1, uid);
-
- int row = preparedStatement.executeUpdate();
-
- System.out.println("删除结果: " + row);
-
- preparedStatement.close();
- //Connection连接对象归还数据库连接池
- conn.close();
-
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- }
-
-
- public static void main(String[] args) {
- /**
- * 1、验证连接数
- */
- for (int i = 0; i < 10; i++) {
- Connection connection = null;
- try {
- connection = dataSource.getConnection();
- System.out.println(connection.toString());
- } catch (SQLException e) {
- e.printStackTrace();
- } finally {
- if (null != connection) {
- try {
- connection.close();
- } catch (SQLException e) {
- e.printStackTrace();
- }
- }
- }
-
- }
-
- /**
- * 2、查询
- */
- List<Object[]> list = query();
-
- if (null != list && list.size() > 0) {
- for (int i = 0; i < list.size(); i++) {
- Object[] objects = list.get(i);
- for (int j = 0; j < objects.length; j++) {
- System.out.print(objects[j] + " ");
- }
- System.out.println();
- }
- }
-
- /**
- * 3、新增
- */
- String name = "乐乐";
- int age = 17;
- String phone = "13800138001";
- String passwd = "admin123";
- add(name, age, phone, passwd);
-
- /**
- * 4、修改
- */
- update(12, name, age, phone, passwd);
-
- /**
- * 5、删除
- */
- deleteById(3);
-
- }
-
- }

2、Fork/Join框架类
- package com.example.jdbcConnection;
-
- import java.util.concurrent.ForkJoinPool;
- import java.util.concurrent.TimeUnit;
-
- /**
- * Created by Liuxd on 2018/8/23.
- */
- public class TestForkJoinPool {
- public static void main(String[] args) throws Exception {
- System.out.println("*****************************程序开始执行*****************************");
- // 创建线程池,包含Runtime.getRuntime().availableProcessors()返回值作为个数的并行线程的ForkJoinPool
- ForkJoinPool forkJoinPool = new ForkJoinPool();
- TestC3p0 testC3p0 = new TestC3p0();
- // 提交可拆分的Task任务
- forkJoinPool.submit(new MyTask(0, 1000,testC3p0));
- //阻塞当前线程直到 ForkJoinPool 中所有的任务都执行结束
- forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);
- // 关闭线程池
- forkJoinPool.shutdown();
- System.out.println("*****************************程序执行结束*****************************");
- }
- }

3、Task执行类
- package com.example.jdbcConnection;
-
-
- import java.util.concurrent.RecursiveAction;
- /**
- *
- * reated by Liuxd on 2018/8/23.
- */
- class MyTask extends RecursiveAction {
- // 每个"小任务"最多执行保存20个数
- private static final int MAX = 20;
-
- private int start;
- private int end;
-
- private TestC3p0 testC3p0;
-
- MyTask(int start, int end,TestC3p0 testC3p0) {
- this.start = start;
- this.end = end;
- this.testC3p0=testC3p0;
- }
-
- @Override
- protected void compute() {
- // 当end-start的值小于MAX时候,开始执行
- if ((end - start) < MAX) {
- for (int i = start; i < end; i++) {
- String name = "乐乐"+i;
- int age = 17+i;
- String phone = "1380013800"+i;
- String passwd = "admin123"+i;
- testC3p0.add(name, age, phone, passwd);
- System.out.println(Thread.currentThread().getName() + "保存"+name+" "+age+" "+" "+phone+" "+passwd);
- }
- } else {
- // 将大任务分解成两个小任务
- int middle = (start + end) / 2;
- MyTask left = new MyTask(start, middle,testC3p0);
- MyTask right = new MyTask(middle, end,testC3p0);
-
- // 并行执行两个小任务
- left.fork();
- right.fork();
- }
- }
- }
-
-

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。