Skip to content

Commit

Permalink
fix #215 #161 Improve streaming sort performance.
Browse files Browse the repository at this point in the history
  • Loading branch information
gaohongtao committed Jan 18, 2017
1 parent 4e63e67 commit 73fc752
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
import com.dangdang.ddframe.rdb.sharding.merger.resultset.delegate.AbstractDelegateResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.resultset.memory.row.OrderByResultSetRow;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.OrderByColumn;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;

/**
* 流式排序的聚集结果集.
Expand All @@ -37,72 +39,64 @@
@Slf4j
public final class StreamingOrderByReducerResultSet extends AbstractDelegateResultSet {

private final List<OrderByColumn> orderByColumns;
private final Queue<ResultSetOrderByWrapper> delegateResultSetQueue;

private final List<OrderByColumn> orderByKeys;

private final List<OrderByDelegateResultSet> delegateResultSets = new LinkedList<>();
private OrderByDelegateResultSet lastDelegateResultSet ;


public StreamingOrderByReducerResultSet(final ResultSetMergeContext resultSetMergeContext) throws SQLException {
super(resultSetMergeContext.getShardingResultSets().getResultSets());
orderByColumns = resultSetMergeContext.getCurrentOrderByKeys();
List<ResultSet> mergeResultSets = resultSetMergeContext.getShardingResultSets().getResultSets();
for(ResultSet each: mergeResultSets){
delegateResultSets.add(new OrderByDelegateResultSet(each,orderByColumns));
}
delegateResultSetQueue = new PriorityQueue<>(getResultSets().size());
orderByKeys = resultSetMergeContext.getCurrentOrderByKeys();
}

@Override
protected boolean firstNext() throws SQLException {
for (OrderByDelegateResultSet each : delegateResultSets) {
each.next();
for (ResultSet each : getResultSets()) {
ResultSetOrderByWrapper wrapper = new ResultSetOrderByWrapper(each);
if (wrapper.next()) {
delegateResultSetQueue.offer(wrapper);
}
}
return doNext();
}

@Override
protected boolean afterFirstNext() throws SQLException {
lastDelegateResultSet.next();
ResultSetOrderByWrapper firstResultSet = delegateResultSetQueue.poll();
setDelegate(firstResultSet.delegate);
if (firstResultSet.next()) {
delegateResultSetQueue.offer(firstResultSet);
}
return doNext();
}

private boolean doNext() throws SQLException {
setDelegateResultSet();
return !delegateResultSets.isEmpty();
}

private void setDelegateResultSet() throws SQLException {
OrderByResultSetRow chosenOrderByValue = null;
for (OrderByDelegateResultSet each : delegateResultSets) {
OrderByResultSetRow eachOrderByValue = each.orderByValue;
if (null == chosenOrderByValue || chosenOrderByValue.compareTo(eachOrderByValue) > 0) {
chosenOrderByValue = eachOrderByValue;
setDelegate(each.delegate);
lastDelegateResultSet = each;
}
private boolean doNext() {
if (delegateResultSetQueue.isEmpty()) {
return false;
}
log.trace("Chosen order by value: {}, current result set hashcode: {}", chosenOrderByValue, getDelegate().hashCode());
setDelegate(delegateResultSetQueue.peek().delegate);
log.trace("Chosen order by value: {}, current result set hashcode: {}", delegateResultSetQueue.peek().row, getDelegate().hashCode());
return true;
}

class OrderByDelegateResultSet {
private ResultSet delegate;
private List<OrderByColumn> orderByColumns;
private OrderByResultSetRow orderByValue ;

public OrderByDelegateResultSet(ResultSet delegate, List<OrderByColumn> orderByColumns) throws SQLException {
this.delegate = delegate;
this.orderByColumns = orderByColumns;
}

public boolean next() throws SQLException {

@RequiredArgsConstructor
private class ResultSetOrderByWrapper implements Comparable<ResultSetOrderByWrapper> {

private final ResultSet delegate;

private OrderByResultSetRow row;

boolean next() throws SQLException {
boolean result = delegate.next();
if(result) {
orderByValue = new OrderByResultSetRow(delegate, orderByColumns);
}else {
delegateResultSets.remove(this);
if (result) {
row = new OrderByResultSetRow(delegate, orderByKeys);
}
return result;
}


@Override
public int compareTo(final ResultSetOrderByWrapper o) {
return row.compareTo(o.row);
}
}
}
7 changes: 7 additions & 0 deletions sharding-jdbc-doc/content/03-community/release-notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ next = "/03-community/directory-structure"

+++

## 1.4.2

### 功能提升

1. [ISSUE #215](https://github.com/dangdangdotcom/sharding-jdbc/issues/215) 流式排序的聚集结果集 StreamingOrderByReducerResultSet性能优化
1. [ISSUE #161](https://github.com/dangdangdotcom/sharding-jdbc/issues/161) 结果集归并的时候可以采用堆排序来提升性能

## 1.4.1

### 功能提升
Expand Down

0 comments on commit 73fc752

Please sign in to comment.