Skip to content

Commit

Permalink
Merge pull request #323 from MyCATApache/v1.07-autohandler
Browse files Browse the repository at this point in the history
V1.07 autohandler
  • Loading branch information
junwen12221 authored Jun 6, 2020
2 parents 6da64d3 + cfff6de commit 8bf94d1
Show file tree
Hide file tree
Showing 78 changed files with 2,095 additions and 332 deletions.
2 changes: 0 additions & 2 deletions common/src/main/java/io/mycat/DataSourceNearness.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,5 @@ public interface DataSourceNearness {

public void setLoadBalanceStrategy(String loadBalanceStrategy);

public void setUpdate(boolean update);

public void clear();
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,4 +173,12 @@ public void close() {
}

}

/**
* 跳过头部的null
* @return
*/
public List<ColumnInfo> getColumnInfos() {
return columnInfos.subList(1,columnInfos.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
*/
@NoArgsConstructor
@Data
public class InformationSchema {
public class InformationSchema implements Cloneable {

public TABLES_TABLE_OBJECT[] TABLES = new TABLES_TABLE_OBJECT[]{};

Expand Down Expand Up @@ -2253,4 +2253,9 @@ public static class ROUTINES_TABLE_OBJECT {

public String DATABASE_COLLATION;
}

@Override
public Object clone() throws CloneNotSupportedException {
return super.clone();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.mycat.beans.mycat.MycatRowMetaData;
import io.mycat.beans.mysql.MySQLFieldInfo;
import io.mycat.beans.mysql.MySQLFieldsType;
import io.mycat.util.StringUtil;

import java.sql.ResultSetMetaData;
import java.util.Arrays;
Expand Down Expand Up @@ -71,7 +72,11 @@ byte[] getBytes(String text){
}
public ColumnDefPacketImpl(final MycatRowMetaData resultSetMetaData, int columnIndex) {
try {
this.columnSchema = getBytes(resultSetMetaData.getSchemaName(columnIndex));
String schemaName = resultSetMetaData.getSchemaName(columnIndex);
if (StringUtil.isEmpty(schemaName )){
schemaName = "UNKNOWN";//mysql workbench 该字段不能为长度0
}
this.columnSchema = getBytes(schemaName);
this.columnName = getBytes(resultSetMetaData.getColumnLabel(columnIndex));
this.columnOrgName = getBytes(resultSetMetaData.getColumnName(columnIndex));
this.columnNextLength = 0xC;
Expand Down
2 changes: 1 addition & 1 deletion datasource/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<version>1.22.0</version>
<version>1.23.0</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

public abstract class TransactionSessionTemplate implements TransactionSession {
protected final Map<String, DefaultConnection> updateConnectionMap = new HashMap<>();
protected final DataSourceNearness dataSourceNearness = new DataSourceNearnessImpl();
protected final DataSourceNearness dataSourceNearness = new DataSourceNearnessImpl(this);
final MycatDataContext dataContext;
protected final ConcurrentLinkedQueue<AutoCloseable> closeResourceQueue = new ConcurrentLinkedQueue<>();

Expand Down Expand Up @@ -78,8 +78,6 @@ public void doAction() {
if (!isAutocommit()) {
begin();
}

dataSourceNearness.setUpdate(isInTransaction());
}

abstract protected void callBackBegin();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void test() throws Exception {
for (int i = 0; i < 10; i++) {
set.add(TestUtil.getString(statement.executeQuery(
"explain select * from travelrecord"
)));
)).replaceAll("id=\\[\\d+\\]",""));
}
Assert.assertTrue(set.size() == 1);//验证有事务的情况下,不读写分离
}
Expand All @@ -109,7 +109,7 @@ public void test() throws Exception {
for (int i = 0; i < 10; i++) {
set.add(TestUtil.getString(statement.executeQuery(
"explain select * from travelrecord"
)));
)).replaceAll("id=\\[\\d+\\]",""));
}
Assert.assertEquals(1, set.size());//验证无事务的情况下但是set autocommit = 0,不读写分离
}
Expand All @@ -126,7 +126,7 @@ public void test() throws Exception {
for (int i = 0; i < 10; i++) {
set.add(TestUtil.getString(statement.executeQuery(
" select * from travelrecord"
)));
)).replaceAll("id=\\[\\d+\\]",""));
}
Assert.assertEquals(1, set.size());//验证无事务的情况下但是set autocommit = 1,读写分离
}
Expand All @@ -144,7 +144,7 @@ public void test() throws Exception {
for (int i = 0; i < 10; i++) {
set.add(TestUtil.getString(statement.executeQuery(
"explain select * from travelrecord"
)));
)).replaceAll("id=\\[\\d+\\]",""));
}
Assert.assertTrue(set.size() == 1);//验证有事务的情况下,不读写分离
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void test() throws Exception {
for (int i = 0; i < 10; i++) {
set.add(TestUtil.getString(statement.executeQuery(
"explain select * from travelrecord"
)));
)).replaceAll("id=\\[\\d+\\]",""));
}
Assert.assertTrue(set.size() == 1);//验证有事务的情况下,不读写分离
}
Expand All @@ -115,7 +115,7 @@ public void test() throws Exception {
for (int i = 0; i < 10; i++) {
set.add(TestUtil.getString(statement.executeQuery(
"explain select * from travelrecord"
)));
)).replaceAll("id=\\[\\d+\\]",""));
}
Assert.assertEquals(1, set.size());//验证无事务的情况下但是set autocommit = 0,不读写分离
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void test() throws Exception {
for (int i = 0; i < 10; i++) {
set.add(TestUtil.getString(statement.executeQuery(
"explain select * from travelrecord"
)));
)).replaceAll("id=\\[\\d+\\]",""));
}
Assert.assertTrue(set.size() == 1);//验证有事务的情况下,不读写分离
}
Expand All @@ -109,7 +109,7 @@ public void test() throws Exception {
for (int i = 0; i < 10; i++) {
set.add(TestUtil.getString(statement.executeQuery(
"explain select * from travelrecord"
)));
)).replaceAll("id=\\[\\d+\\]",""));
}
Assert.assertEquals(1, set.size());//验证无事务的情况下但是set autocommit = 0,不读写分离
}
Expand Down Expand Up @@ -144,7 +144,7 @@ public void test() throws Exception {
for (int i = 0; i < 10; i++) {
set.add(TestUtil.getString(statement.executeQuery(
"explain select * from travelrecord"
)));
)).replaceAll("id=\\[\\d+\\]",""));
}
Assert.assertTrue(set.size() == 1);//验证有事务的情况下,不读写分离
}
Expand Down
2 changes: 1 addition & 1 deletion hbt/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<version>1.22.0</version>
<version>1.23.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.codehaus.janino/janino -->
<dependency>
Expand Down
61 changes: 37 additions & 24 deletions hbt/src/main/java/io/mycat/calcite/CalciteRunners.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.mycat.calcite.prepare.MycatCalcitePlanner;
import io.mycat.calcite.resultset.EnumeratorRowIterator;
import io.mycat.calcite.resultset.MyCatResultSetEnumerator;
import io.mycat.calcite.rules.StreamUnionRule;
import io.mycat.calcite.table.SingeTargetSQLTable;
import io.mycat.datasource.jdbc.JdbcRuntime;
import io.mycat.upondb.MycatDBContext;
Expand All @@ -13,6 +14,8 @@
import org.apache.calcite.linq4j.AbstractEnumerable;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.plan.hep.HepPlanner;
import org.apache.calcite.plan.hep.HepProgramBuilder;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelShuttleImpl;
import org.apache.calcite.rel.core.TableScan;
Expand All @@ -21,6 +24,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.StringWriter;
import java.util.*;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand All @@ -30,37 +34,28 @@ public class CalciteRunners {
private final static Logger LOGGER = LoggerFactory.getLogger(CalciteRunners.class);

@SneakyThrows
public static RelNode complie(MycatCalcitePlanner planner, String sql, boolean forUpdate) {
public static RelNode compile(MycatCalcitePlanner planner, String sql, boolean forUpdate) {
SqlNode sqlNode = planner.parse(sql);
SqlNode validate = planner.validate(sqlNode);
RelNode relNode = planner.convert(validate);
return complie(planner, relNode, forUpdate);
return compile(planner, relNode, forUpdate);
}

public static RelNode complie(MycatCalcitePlanner planner, RelNode relNode, boolean forUpdate) {
return planner.pushDownBySQL(planner.eliminateLogicTable(relNode), forUpdate);
public static RelNode compile(MycatCalcitePlanner planner, RelNode relNode, boolean forUpdate) {
try {
relNode = planner.eliminateLogicTable(relNode);
relNode = planner.pullUpUnion(relNode);
relNode = planner.pushDownBySQL(relNode, forUpdate);
return relNode;
}catch (Throwable e){
LOGGER.error("",e);
}
return null;
}


@SneakyThrows
public static RowBaseIterator run(MycatCalciteDataContext calciteDataContext, RelNode relNode) {
Future<?> submit = JdbcRuntime.INSTANCE.getFetchDataExecutorService().submit(new Runnable() {
@Override
@SneakyThrows
public void run() {
fork(calciteDataContext, relNode);
}
});

ArrayBindable bindable1 = Interpreters.bindable(relNode);
submit.get(1, TimeUnit.MINUTES);
Enumerable<Object[]> bind = bindable1.bind(calciteDataContext);

Enumerator<Object[]> enumerator = bind.enumerator();
return new EnumeratorRowIterator(CalciteConvertors.getMycatRowMetaData(relNode.getRowType()), enumerator);
}

private static void fork(MycatCalciteDataContext calciteDataContext, RelNode relNode) throws IllegalAccessException {
Map<String, List<SingeTargetSQLTable>> map = new HashMap<>();
relNode.accept(new RelShuttleImpl() {
@Override
Expand All @@ -73,6 +68,23 @@ public RelNode visit(TableScan scan) {
return super.visit(scan);
}
});

HepProgramBuilder hepProgramBuilder = new HepProgramBuilder();
hepProgramBuilder.addMatchLimit(64);

hepProgramBuilder.addRuleInstance(StreamUnionRule.INSTANCE);
final HepPlanner planner2 = new HepPlanner(hepProgramBuilder.build());
planner2.setRoot(relNode);
relNode = planner2.findBestExp();
fork(calciteDataContext, map);
ArrayBindable bindable1 = Interpreters.bindable(relNode);
Enumerable<Object[]> bind = bindable1.bind(calciteDataContext);

Enumerator<Object[]> enumerator = bind.enumerator();
return new EnumeratorRowIterator(CalciteConvertors.getMycatRowMetaData(relNode.getRowType()), enumerator);
}

private static void fork(MycatCalciteDataContext calciteDataContext, Map<String, List<SingeTargetSQLTable>> map) throws IllegalAccessException {
MycatDBContext uponDBContext = calciteDataContext.getUponDBContext();
AtomicBoolean cancelFlag = uponDBContext.cancelFlag();
if (uponDBContext.isInTransaction()) {
Expand All @@ -87,15 +99,16 @@ public RelNode visit(TableScan scan) {
if (list.size() > 1) {
throw new IllegalAccessException("该执行计划重复拉取同一个数据源的数据");
}
Future<RowBaseIterator> submit = JdbcRuntime.INSTANCE.getFetchDataExecutorService()
.submit(() -> connection.executeQuery(table.getMetaData(), table.getSql()));
table.setEnumerable(new AbstractEnumerable<Object[]>() {
@Override
@SneakyThrows
public Enumerator<Object[]> enumerator() {
return new MyCatResultSetEnumerator(cancelFlag, connection.executeQuery(table.getMetaData(), table.getSql()));
return new MyCatResultSetEnumerator(cancelFlag, submit.get(1, TimeUnit.MINUTES));
}
});
}

} else {
Iterator<String> iterator = map.entrySet().stream()
.flatMap(i -> i.getValue().stream())
Expand All @@ -113,7 +126,7 @@ public Enumerator<Object[]> enumerator() {
@Override
@SneakyThrows
public Enumerator<Object[]> enumerator() {
LOGGER.info("------!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
LOGGER.info("拉取数据"+v.getTargetName()+" sql:"+v.getSql());
return new MyCatResultSetEnumerator(cancelFlag, submit.get());
}
};
Expand Down
28 changes: 18 additions & 10 deletions hbt/src/main/java/io/mycat/calcite/CalciteUtls.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.mycat.queryCondition.DataMappingEvaluator;
import io.mycat.queryCondition.SimpleColumnInfo;
import org.apache.calcite.DataContext;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Union;
import org.apache.calcite.rel.rel2sql.SqlImplementor;
import org.apache.calcite.rex.*;
import org.apache.calcite.sql.SqlIdentifier;
Expand Down Expand Up @@ -57,13 +59,23 @@ public static List<QueryBackendTask> getQueryBackendTasks(ShardingTableHandler t
List<QueryBackendTask> list = new ArrayList<>();
for (BackendTableInfo backendTableInfo : calculate) {
String backendTaskSQL = getBackendTaskSQL(filters, rawColumnList, projectColumnList, backendTableInfo);
QueryBackendTask queryBackendTask = new QueryBackendTask( backendTableInfo.getTargetName(),backendTaskSQL);
QueryBackendTask queryBackendTask = new QueryBackendTask(backendTableInfo.getTargetName(), backendTaskSQL);
list.add(queryBackendTask);
}
return list;

}

public static void collect(Union e, List<RelNode> unions) {
for (RelNode input : e.getInputs()) {
if (input instanceof Union){
collect((Union)input,unions);
}else {
unions.add(input);
}
}
}

public static List<BackendTableInfo> getBackendTableInfos(ShardingTableHandler table, List<RexNode> filters) {
LOGGER.info("origin filters:{}", filters);
DataMappingEvaluator record = new DataMappingEvaluator();
Expand Down Expand Up @@ -132,8 +144,8 @@ public SqlNode field(int ordinal) {
};
try {
return " where " + context.toSql(null, rexNode).toSqlString(MysqlSqlDialect.DEFAULT).getSql();
}catch (Exception e){
LOGGER.warn("不能生成对应的sql",e);
} catch (Exception e) {
LOGGER.warn("不能生成对应的sql", e);
}
return "";
}
Expand Down Expand Up @@ -163,13 +175,13 @@ public static boolean addFilter(ShardingTableHandler table, DataMappingEvaluator
RexNode left = operands.get(i);
RexNode right = operands.get(j);
if (left instanceof RexCall && right instanceof RexCall) {
if ((left.isA(SqlKind.GREATER_THAN_OR_EQUAL)||left.isA(SqlKind.GREATER_THAN)) && (right.isA(SqlKind.LESS_THAN_OR_EQUAL)||right.isA(SqlKind.LESS_THAN))) {
if ((left.isA(SqlKind.GREATER_THAN_OR_EQUAL) || left.isA(SqlKind.GREATER_THAN)) && (right.isA(SqlKind.LESS_THAN_OR_EQUAL) || right.isA(SqlKind.LESS_THAN))) {
RexNode fisrtExpr = unCastWrapper(((RexCall) left).getOperands().get(0));
RexNode secondExpr =unCastWrapper(((RexCall) right).getOperands().get(0));
RexNode secondExpr = unCastWrapper(((RexCall) right).getOperands().get(0));
if (fisrtExpr instanceof RexInputRef && secondExpr instanceof RexInputRef) {
int index = ((RexInputRef) fisrtExpr).getIndex();
if (index == ((RexInputRef) secondExpr).getIndex()) {
RexNode start =unCastWrapper( ((RexCall) left).getOperands().get(1));
RexNode start = unCastWrapper(((RexCall) left).getOperands().get(1));
RexNode end = unCastWrapper(((RexCall) right).getOperands().get(1));
if (start instanceof RexLiteral && end instanceof RexLiteral) {
String startValue = ((RexLiteral) start).getValue2().toString();
Expand Down Expand Up @@ -199,10 +211,6 @@ public static boolean addFilter(ShardingTableHandler table, DataMappingEvaluator
left = unCastWrapper(left);






RexNode right = call.getOperands().get(1);
right = unCastWrapper(right);
if (left instanceof RexInputRef && right instanceof RexLiteral) {
Expand Down
Loading

0 comments on commit 8bf94d1

Please sign in to comment.