Skip to content

Commit

Permalink
fix #152 Make every SQLExecuteUnit to have its own Statement.
Browse files Browse the repository at this point in the history
  • Loading branch information
hanahmily authored and gaohongtao committed Oct 26, 2016
1 parent ec657f6 commit 695cf76
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ public boolean execute() throws SQLException {
}
}

private void clearRouteContext() throws SQLException {
protected void clearRouteContext() throws SQLException {
super.clearRouteContext();
clearParameters();
setCurrentResultSet(null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.MergeContext;
import com.dangdang.ddframe.rdb.sharding.router.SQLExecutionUnit;
import com.dangdang.ddframe.rdb.sharding.router.SQLRouteResult;
import com.google.common.base.Charsets;
import com.google.common.hash.HashCode;
import com.google.common.hash.Hashing;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
Expand All @@ -37,9 +36,10 @@
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

/**
* 支持分片的静态语句对象.
Expand All @@ -61,8 +61,7 @@ public class ShardingStatement extends AbstractStatementAdapter {
@Getter
private final int resultSetHoldability;

@Getter(AccessLevel.PROTECTED)
private final Map<HashCode, Statement> cachedRoutedStatements = new HashMap<>();
private final Deque<List<Statement>> cachedRoutedStatements = Lists.newLinkedList();

@Getter(AccessLevel.PROTECTED)
@Setter(AccessLevel.PROTECTED)
Expand All @@ -71,11 +70,11 @@ public class ShardingStatement extends AbstractStatementAdapter {
@Setter(AccessLevel.PROTECTED)
private ResultSet currentResultSet;

public ShardingStatement(final ShardingConnection shardingConnection) {
ShardingStatement(final ShardingConnection shardingConnection) {
this(shardingConnection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);
}

public ShardingStatement(final ShardingConnection shardingConnection, final int resultSetType, final int resultSetConcurrency) {
ShardingStatement(final ShardingConnection shardingConnection, final int resultSetType, final int resultSetConcurrency) {
this(shardingConnection, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT);
}

Expand All @@ -85,6 +84,8 @@ public ShardingStatement(final ShardingConnection shardingConnection, final int
this.resultSetType = resultSetType;
this.resultSetConcurrency = resultSetConcurrency;
this.resultSetHoldability = resultSetHoldability;
cachedRoutedStatements.add(new LinkedList<Statement>());
cachedRoutedStatements.add(new LinkedList<Statement>());
}

@Override
Expand All @@ -94,51 +95,94 @@ public Connection getConnection() throws SQLException {

@Override
public ResultSet executeQuery(final String sql) throws SQLException {
if (null != currentResultSet && !currentResultSet.isClosed()) {
currentResultSet.close();
ResultSet rs;
try {
rs = ResultSetFactory.getResultSet(generateExecutor(sql).executeQuery(), mergeContext);
} finally {
clearRouteContext();
}
currentResultSet = ResultSetFactory.getResultSet(generateExecutor(sql).executeQuery(), mergeContext);
return currentResultSet;
setCurrentResultSet(rs);
return rs;
}

@Override
public int executeUpdate(final String sql) throws SQLException {
return generateExecutor(sql).executeUpdate();
try {
return generateExecutor(sql).executeUpdate();
} finally {
clearRouteContext();
}
}

@Override
public int executeUpdate(final String sql, final int autoGeneratedKeys) throws SQLException {
return generateExecutor(sql).executeUpdate(autoGeneratedKeys);
try {
return generateExecutor(sql).executeUpdate(autoGeneratedKeys);
} finally {
clearRouteContext();
}
}

@Override
public int executeUpdate(final String sql, final int[] columnIndexes) throws SQLException {
return generateExecutor(sql).executeUpdate(columnIndexes);
try {
return generateExecutor(sql).executeUpdate(columnIndexes);
} finally {
clearRouteContext();
}
}

@Override
public int executeUpdate(final String sql, final String[] columnNames) throws SQLException {
return generateExecutor(sql).executeUpdate(columnNames);
try {
return generateExecutor(sql).executeUpdate(columnNames);
} finally {
clearRouteContext();
}
}

@Override
public boolean execute(final String sql) throws SQLException {
return generateExecutor(sql).execute();
try {
return generateExecutor(sql).execute();
} finally {
clearRouteContext();
}
}

@Override
public boolean execute(final String sql, final int autoGeneratedKeys) throws SQLException {
return generateExecutor(sql).execute(autoGeneratedKeys);
try {
return generateExecutor(sql).execute(autoGeneratedKeys);
} finally {
clearRouteContext();
}
}

@Override
public boolean execute(final String sql, final int[] columnIndexes) throws SQLException {
return generateExecutor(sql).execute(columnIndexes);
try {
return generateExecutor(sql).execute(columnIndexes);
} finally {
clearRouteContext();
}
}

@Override
public boolean execute(final String sql, final String[] columnNames) throws SQLException {
return generateExecutor(sql).execute(columnNames);
try {
return generateExecutor(sql).execute(columnNames);
} finally {
clearRouteContext();
}
}

protected void clearRouteContext() throws SQLException {
setCurrentResultSet(null);
List<Statement> firstList = cachedRoutedStatements.pollFirst();
cachedRoutedStatements.getFirst().addAll(firstList);
firstList.clear();
cachedRoutedStatements.addLast(firstList);
}

private StatementExecutor generateExecutor(final String sql) throws SQLException {
Expand All @@ -154,12 +198,18 @@ private StatementExecutor generateExecutor(final String sql) throws SQLException
}

protected Statement getStatement(final Connection connection, final String sql) throws SQLException {
HashCode hashCode = Hashing.md5().newHasher().putInt(connection.hashCode()).putString(sql, Charsets.UTF_8).hash();
if (cachedRoutedStatements.containsKey(hashCode)) {
return cachedRoutedStatements.get(hashCode);
Statement statement = null;
for (Iterator<Statement> iterator = cachedRoutedStatements.getFirst().iterator(); iterator.hasNext();) {
Statement each = iterator.next();
if (each.getConnection() == connection) {
statement = each;
iterator.remove();
}
}
Statement statement = generateStatement(connection, sql);
cachedRoutedStatements.put(hashCode, statement);
if (null == statement) {
statement = generateStatement(connection, sql);
}
cachedRoutedStatements.getLast().add(statement);
return statement;
}

Expand Down Expand Up @@ -191,7 +241,13 @@ public ResultSet getResultSet() throws SQLException {
}

@Override
public Collection<? extends Statement> getRoutedStatements() throws SQLException {
return cachedRoutedStatements.values();
protected void clearRouteStatements() {
cachedRoutedStatements.getFirst().clear();
cachedRoutedStatements.getLast().clear();
}

@Override
public Collection<? extends Statement> getRoutedStatements() {
return Lists.newArrayList(Iterators.concat(cachedRoutedStatements.getFirst().iterator(), cachedRoutedStatements.getLast().iterator()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@ public abstract class AbstractStatementAdapter extends AbstractUnsupportedOperat

private int fetchSize;

protected abstract void clearRouteStatements();

@Override
public final void close() throws SQLException {
for (Statement each : getRoutedStatements()) {
each.close();
}
closed = true;
getRoutedStatements().clear();
clearRouteStatements();
}

@Override
Expand Down Expand Up @@ -218,7 +220,6 @@ public final ResultSet getGeneratedKeys() throws SQLException {
* 获取路由的静态语句对象集合.
*
* @return 路由的静态语句对象集合
* @throws SQLException SQL执行异常
*/
protected abstract Collection<? extends Statement> getRoutedStatements() throws SQLException;
protected abstract Collection<? extends Statement> getRoutedStatements();
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@
MasterSlaveDataSourceTest.class,
ParameterListTest.class,
AbstractPreparedStatementAdapterTest.class,
ShardingConnectionTest.class,
ShardingConnectionTest.class,
ShardingPreparedStatementTableOnlyTest.class,
})
public class AllJDBCTests {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package com.dangdang.ddframe.rdb.sharding.jdbc;

import com.dangdang.ddframe.rdb.integrate.tbl.AbstractShardingTablesOnlyDBUnitTest;
import org.junit.Before;
import org.junit.Test;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

public final class ShardingPreparedStatementTableOnlyTest extends AbstractShardingTablesOnlyDBUnitTest {

private ShardingDataSource shardingDataSource;

@Before
public void init() throws SQLException {
shardingDataSource = getShardingDataSource();
}

@Test
public void assertExecuteQueryWithParameter() throws SQLException {
String sql = "SELECT COUNT(*) AS `orders_count` FROM `t_order` WHERE `status` = ?";
try (
Connection connection = shardingDataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setString(1, "init");
ResultSet resultSet = preparedStatement.executeQuery();
assertTrue(resultSet.next());
assertThat(resultSet.getLong(1), is(20L));
ShardingPreparedStatement sps = (ShardingPreparedStatement) preparedStatement;
assertThat(sps.getRoutedStatements().size(), is(10));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,12 @@ public void assertOverMaxUpdateRow() throws SQLException {

AbstractStatementAdapter statement = new AbstractStatementAdapter(Statement.class) {
@Override
protected Collection<? extends Statement> getRoutedStatements() throws SQLException {
protected void clearRouteStatements() {

}

@Override
protected Collection<? extends Statement> getRoutedStatements() {
return Lists.newArrayList(st1, st2);
}

Expand Down
1 change: 1 addition & 0 deletions sharding-jdbc-doc/content/post/release_notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ weight = 1
1. [ISSUE #149](https://github.com/dangdangdotcom/sharding-jdbc/issues/149) INSERT IGNORE INTO时如果数据重了忽略时返回的成-1了,应该返回0
1. [ISSUE #118](https://github.com/dangdangdotcom/sharding-jdbc/issues/118) 同一个线程内先执行DQL后执行DML,DML操作在从库上执行
1. [ISSUE #122](https://github.com/dangdangdotcom/sharding-jdbc/issues/122) bed的fail重试问题
1. [ISSUE #152](https://github.com/dangdangdotcom/sharding-jdbc/issues/152) 可能同一个connection多线程导致问题

## 1.3.2

Expand Down

0 comments on commit 695cf76

Please sign in to comment.