Re: Parallel batch insert failure in MonetDB through JDBC

Hi, I am trying to insert data in multiple tables in parallel through prepared statements batch execution. I am facing a problem , first thread that executes the batch makes other executions to crash because of the underlying exception - prepared statement is no longer available. Is there a limitation of updating schema in parallel jobs ? If not then what may be the cause of this problem ? Any light on how I may be able to implement this will be helpful.
code that executes batches
package monet.test; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Statement; import java.sql.Timestamp; import java.sql.Types; import java.util.ArrayList; import java.util.List; import java.util.Random; public class BatchTest { public static class TestRunner implements Runnable { private final static String problemHere = "jdbc:monetdb://localhost/test-db"; static { try { Class.forName("nl.cwi.monetdb.jdbc.MonetDriver"); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } } private static Connection createConnection() throws SQLException { // happens one time per thread return DriverManager.getConnection(problemHere, "monetdb", "monetdb"); } private static int fieldCount = 10; private static int[] colType = new int[fieldCount]; private static final int[] types = { Types.VARCHAR, Types.TIMESTAMP, Types.DECIMAL }; static { Random random = new Random(); // initialize column types for (int i = 0; i < fieldCount; i++) { colType[i] = types[random.nextInt(types.length)]; } } private final String name; private Connection con; private int batchCount; private int batchSize; private String create; private String drop; private String insert; public TestRunner(String string, int bs, int bc) throws SQLException { this.name = string; this.con = createConnection(); this.batchCount = bc; this.batchSize = bs; this.create = "create table " + name + " ("; for (int i = 0; i < fieldCount; i++) { create += (i == 0 ? "" : ",") + "col" + i + " " + getType(colType[i]); } this.create += ")"; this.insert = "insert into " + name + " values ("; for (int i = 0; i < fieldCount; i++) { insert += (i == 0 ? "" : ",") + "?"; } this.insert += ")"; this.drop = "drop table " + name; } private static String getType(int i) { switch (i) { case Types.DECIMAL: return "decimal(18,9)"; case Types.VARCHAR: return "varchar(30000)"; case Types.TIMESTAMP: return "timestamp"; } return null; } protected void finalize() throws Throwable { if (con != null) { con.close(); } }; public void run() { System.out.format("%s started.%ncreate: %s%ninsert: %s%ndrop: %s%n", name, create, insert, drop); try (Statement stmt = con.createStatement()) { // this will throw the exception of concurrency con synchronized (problemHere) { System.out.format("%d created %s%n", stmt.executeUpdate(create), name); if (!con.getAutoCommit()) { con.commit(); } } } catch (SQLException e1) { e1.printStackTrace(); return; } try (PreparedStatement stmt = con.prepareStatement(insert)) { while (batchCount-- > 0) { for (int i = batchSize; i > 0; i--) { setBatchInput(stmt); stmt.addBatch(); } System.out.format("%s - submitting batch ...%n", name); stmt.executeBatch(); if (!con.getAutoCommit()) { con.commit(); } } } catch (Exception e) { e.printStackTrace(); } finally { if (con != null) { try (Statement stmt = con.createStatement()) { synchronized (problemHere) { System.out.format("%d deleted - %s%n", stmt.executeUpdate(drop), name); if (!con.getAutoCommit()) { con.commit(); } } } catch (SQLException e1) { e1.printStackTrace(); } try { con.close(); con = null; } catch (SQLException e) { e.printStackTrace(); } } } System.out.format("%s finished.", name); } private static void setBatchInput(PreparedStatement stmt) throws SQLException { for (int i = 1; i <= fieldCount; i++) { stmt.setObject(i, getRandomFieldValue(colType[i - 1])); } } private static Object getRandomFieldValue(int type) { switch (type) { case Types.DECIMAL: return 0; case Types.VARCHAR: return "null"; case Types.TIMESTAMP: return new Timestamp(System.currentTimeMillis()); } return null; } } public static void main(String[] args) { int num = 2; List<Thread> ts = new ArrayList<>(); while (num-- > 0) { Thread t = null; try { (t = new Thread(new TestRunner("runner_" + num, 10000, 1))).start(); } catch (SQLException e) { e.printStackTrace(); } ts.add(t); } for (Thread t : ts) { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } } } } On Wed, Nov 4, 2015 at 3:20 PM, Siddharth Tyagi <inbox.siddharth@gmail.com> wrote:
Hi, I am trying to insert data in multiple tables in parallel through prepared statements batch execution. I am facing a problem ,
first thread that executes the batch makes other executions to crash because of the underlying exception - prepared statement is no longer available.
Is there a limitation of updating schema in parallel jobs ? If not then what may be the cause of this problem ?
Any light on how I may be able to implement this will be helpful.
code that executes batches
package monet.test;
import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Statement; import java.sql.Timestamp; import java.sql.Types; import java.util.ArrayList; import java.util.List; import java.util.Random;
public class BatchTest { public static class TestRunner implements Runnable { private final static String problemHere = "jdbc:monetdb://localhost/test-db";
static { try { Class.forName("nl.cwi.monetdb.jdbc.MonetDriver"); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
private static Connection createConnection() throws SQLException { // happens one time per thread return DriverManager.getConnection(problemHere, "monetdb", "monetdb"); }
private static int fieldCount = 10; private static int[] colType = new int[fieldCount]; private static final int[] types = { Types.VARCHAR, Types.TIMESTAMP, Types.DECIMAL };
static { Random random = new Random(); // initialize column types for (int i = 0; i < fieldCount; i++) { colType[i] = types[random.nextInt(types.length)]; } }
private final String name; private Connection con; private int batchCount; private int batchSize; private String create; private String drop; private String insert;
public TestRunner(String string, int bs, int bc) throws SQLException { this.name = string; this.con = createConnection(); this.batchCount = bc; this.batchSize = bs; this.create = "create table " + name + " (";
for (int i = 0; i < fieldCount; i++) { create += (i == 0 ? "" : ",") + "col" + i + " " + getType(colType[i]); } this.create += ")"; this.insert = "insert into " + name + " values ("; for (int i = 0; i < fieldCount; i++) { insert += (i == 0 ? "" : ",") + "?"; } this.insert += ")"; this.drop = "drop table " + name; }
private static String getType(int i) { switch (i) { case Types.DECIMAL: return "decimal(18,9)"; case Types.VARCHAR: return "varchar(30000)"; case Types.TIMESTAMP: return "timestamp"; } return null; }
protected void finalize() throws Throwable { if (con != null) { con.close(); } };
public void run() { System.out.format("%s started.%ncreate: %s%ninsert: %s%ndrop: %s%n", name, create, insert, drop); try (Statement stmt = con.createStatement()) { // this will throw the exception of concurrency con synchronized (problemHere) { System.out.format("%d created %s%n", stmt.executeUpdate(create), name); if (!con.getAutoCommit()) { con.commit(); } } } catch (SQLException e1) { e1.printStackTrace(); return; } try (PreparedStatement stmt = con.prepareStatement(insert)) { while (batchCount-- > 0) { for (int i = batchSize; i > 0; i--) { setBatchInput(stmt); stmt.addBatch(); } System.out.format("%s - submitting batch ...%n", name); stmt.executeBatch(); if (!con.getAutoCommit()) { con.commit(); } } } catch (Exception e) { e.printStackTrace(); } finally { if (con != null) { try (Statement stmt = con.createStatement()) { synchronized (problemHere) { System.out.format("%d deleted - %s%n", stmt.executeUpdate(drop), name); if (!con.getAutoCommit()) { con.commit(); } } } catch (SQLException e1) { e1.printStackTrace(); } try { con.close(); con = null; } catch (SQLException e) { e.printStackTrace(); } } } System.out.format("%s finished.", name); }
private static void setBatchInput(PreparedStatement stmt) throws SQLException { for (int i = 1; i <= fieldCount; i++) { stmt.setObject(i, getRandomFieldValue(colType[i - 1])); } }
private static Object getRandomFieldValue(int type) { switch (type) { case Types.DECIMAL: return 0; case Types.VARCHAR: return "null"; case Types.TIMESTAMP: return new Timestamp(System.currentTimeMillis()); } return null; } }
public static void main(String[] args) { int num = 2; List<Thread> ts = new ArrayList<>(); while (num-- > 0) { Thread t = null; try { (t = new Thread(new TestRunner("runner_" + num, 10000, 1))).start(); } catch (SQLException e) { e.printStackTrace(); } ts.add(t); } for (Thread t : ts) { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } } }
}
--
*Thanks and RegardsSiddharth Tyagi*
-- *Thanks and RegardsSiddharth Tyagi*
participants (1)
-
Siddharth Tyagi