SQLite之大数据量批量入库
发布时间:2020-12-12 20:15:16 所属栏目:百科 来源:网络整理
导读:import java.io.BufferedReader;import java.io.File;import java.io.FileInputStream;import java.io.FileNotFoundException;import java.io.IOException;import java.io.InputStream;import java.io.InputStreamReader;import java.sql.Connection;import
import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class BatchTool { // ddl private static String ddl = "CREATE TABLE IF NOT EXISTS pbeijing_point (OBJECTID INTEGER,NAME TEXT,ADDRESS TEXT,PHONE TEXT,FAX TEXT,TYPE TEXT,CITYCODE TEXT,URL TEXT,EMAIL TEXT,NAME2 TEXT,X INTEGER,Y INTEGER)"; Connection jCon = null; // get connection public synchronized Connection getConnection() { if (jCon == null) { // json= Statement state = null; try { Class.forName("org.sqlite.JDBC"); jCon = DriverManager.getConnection("jdbc:sqlite:c:newD.db"); state = jCon.createStatement(); state.executeUpdate(ddl); } catch (SQLException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } } return jCon; } // 创建500个线程 ExecutorService service = Executors.newFixedThreadPool(20); // 读取sql文件 每五百个insert 语句由一个线程批量操作 public void readBatchSQL(InputStream is) throws IOException { BufferedReader bufferReader = new BufferedReader(new InputStreamReader(is,"UTF-8")); String line; String one = ""; int tag = 0; String batchSql = ""; while ((line = bufferReader.readLine()) != null) { one += line; if (one.indexOf(";") != -1) { batchSql += one; one = "";// reset tag++; } // 符合条件 开辟一个线程 if (tag != 0 && tag / 500 != 0) { service.execute(new SQLiteBatchHandler(batchSql)); batchSql = "";// reset tag = 0;// reset } } // 最后执行 剩余的sql if (batchSql.length() > 0) { System.out.println("finalSQL:" + batchSql); Runnable r = new SQLiteBatchHandler(batchSql); service.execute(r); } try { // 关闭线程池 this.service.shutdown(); this.service.awaitTermination(1,TimeUnit.HOURS); getConnection().close(); } catch (InterruptedException e) { e.printStackTrace(); } catch (SQLException e) { e.printStackTrace(); } }; /** * @note 分割sql * */ private static String[] splitSQL(String batchSQl) { if (batchSQl != null) { return batchSQl.split(";"); } return null; } /** * @note 执行批量更新操作 由于connection.comit 操作时 如果存在 statement没有close 就会报错 * 因此将此方法加上同步 。 * */ private synchronized void exucteUpdate(String batch) { Statement ste = null; Connection con = null; try { con = getConnection(); con.setAutoCommit(false); ste = con.createStatement(); String[] sqls = splitSQL(batch); for (String sql : sqls) { if (sql != null) { ste.addBatch(sql); } } ste.executeBatch(); ste.close(); con.commit();// 提交 } catch (Exception e) { e.printStackTrace(); System.out.println("执行失败:" + batch); try { con.rollback();// 回滚 } catch (SQLException e1) { e1.printStackTrace(); } } finally { if (ste != null) { try { ste.close(); } catch (SQLException e) { e.printStackTrace(); } } } } /** * @note 入库线程 * */ private class SQLiteBatchHandler implements Runnable { private String batch; public SQLiteBatchHandler(String sql) { this.batch = sql; } @SuppressWarnings("static-access") @Override public void run() { try { Thread.currentThread().sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } if (this.batch.length() > 0) { exucteUpdate(batch); } } } public static void main(String[] args) throws FileNotFoundException,IOException { BatchTool s = new BatchTool(); s.readBatchSQL(new FileInputStream(new File("c:poi.sql"))); } } (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |