Skip to content

Commit

Permalink
Rewrite insert statement in batches
Browse files Browse the repository at this point in the history
Summary: If the paramset size is too big, we get the error `Query cannot be completed because of Parameter Array capacity of 1048576`. In this diff we limit the number of parameters sent with one INSERT statement.

Test Plan: https://app.circleci.com/pipelines/github/memsql/singlestore-odbc-connector/2986/workflows/fb4486df-8052-45cd-a571-766d723774c1

Reviewers: amakarovych-ua

Reviewed By: amakarovych-ua

Subscribers: engineering-list

JIRA Issues: PLAT-6976

Differential Revision: https://grizzly.internal.memcompute.com/D66924
  • Loading branch information
Pavlo Mishchenko committed Feb 22, 2024
1 parent cafc9c3 commit f2eb11a
Showing 1 changed file with 84 additions and 80 deletions.
164 changes: 84 additions & 80 deletions ma_statement.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
#include <ma_odbc.h>

#define MADB_MIN_QUERY_LEN 5
// SingleStore imposes a limit on the number of parameters sent in one statement. It is defined by
// the global var parametrizer_query_max_params which defaults to 1048576.q
// TODO: PLAT-6999 respect parametrizer_query_max_params value
#define S2_MAX_PARAMS 1024000


struct st_ma_stmt_methods MADB_StmtMethods; /* declared at the end of file */

Expand Down Expand Up @@ -1898,7 +1903,7 @@ SQLRETURN MADB_StmtExecute(MADB_Stmt *Stmt, BOOL ExecDirect)
unsigned int ErrorCount= 0;
unsigned int StatementNr;
unsigned int ParamOffset= 0; /* for multi statements */
SQLULEN j;
SQLULEN j, batchNum;
/* For multistatement direct execution */
char *CurQuery= Stmt->Query.RefinedText, *QueriesEnd= Stmt->Query.RefinedText + Stmt->Query.RefinedLength;

Expand Down Expand Up @@ -1945,12 +1950,6 @@ SQLRETURN MADB_StmtExecute(MADB_Stmt *Stmt, BOOL ExecDirect)
continue; /* bad statement - skip it */
}

MADB_DynString all_params_query;
MADB_InitDynamicString(&all_params_query, "", 1024, 1024);

MADB_DynString combined_insert_query;
MADB_InitDynamicString(&combined_insert_query, "", 1024, 1024);

int numParamRows = Stmt->ParamCount > 0 ? Stmt->Apd->Header.ArraySize : 1;
// We want to send queries of the form INSERT INTO <tbl_name> [<columns_list>] VALUES (?,...)
// as one query with a list of VALUES tuples, e.g. INSERT INTO t VALUES (1, 'i'), (2, 'j'), (3, 'k')
Expand All @@ -1963,37 +1962,86 @@ SQLRETURN MADB_StmtExecute(MADB_Stmt *Stmt, BOOL ExecDirect)
!MADB_FindToken(&Stmt->Query, "ON DUPLICATE KEY UPDATE") && \
!QUERY_IS_MULTISTMT(Stmt->Query);

// when combineQueries is true and rewriteInsert is false, a query of the form
// INSERT INTO <tbl_name> [<columns_list>] VALUES (?,...) will be sent as a multi-statement of the form
// INSERT INTO t VALUES (1, 'i'); INSERT INTO t VALUES (2, 'j'); INSERT INTO t VALUES (3, 'k')
// TODO: combineQueries can be true when Stmt->Query.ReturnsResult is true as well, needs testing
my_bool combineQueries = numParamRows > 1 && !(Stmt->Query.ReturnsResult) && DSN_OPTION(Stmt->Connection, MADB_OPT_FLAG_MULTI_STATEMENTS);
unsigned int valuesOffest = MADB_FindToken(&Stmt->Query, "VALUES") + 6 /* strlen(VALUES) */;
if (rewriteInsert) MADB_DynstrAppendMem(&combined_insert_query, MADB_Token(&Stmt->Query, 0), valuesOffest);
for (j = 0; j < numParamRows; ++j)
if (rewriteInsert)
{
MADB_DynString final_query;
MADB_InitDynamicString(&final_query, "", 1024, 1024);

const CspsControlFlowResult InitParamsRes
= CspsInitStatementParams(Stmt, &final_query, &ErrorCount, &ret, CurQuery, ParamOffset, j);
switch(InitParamsRes) {
case CCFR_OK:
break;
case CCFR_CONTINUE:
MADB_DynstrFree(&final_query);
continue;
case CCFR_ERROR:
MADB_DynstrFree(&final_query);
MADB_DynstrFree(&all_params_query);
MADB_DynstrFree(&combined_insert_query);
goto end;
default:
assert(0);
break;
long rowsInBatch = S2_MAX_PARAMS / MADB_STMT_PARAM_COUNT(Stmt);
unsigned int valuesOffest = MADB_FindToken(&Stmt->Query, "VALUES") + 6 /* strlen(VALUES) */;
// we have to send not more than S2_MAX_PARAMS parameters in one query
for (batchNum = 0; batchNum < (numParamRows + rowsInBatch - 1) / rowsInBatch; ++batchNum)
{
MADB_DynString combined_insert_query;
MADB_InitDynamicString(&combined_insert_query, "", 1024, 1024);
MADB_DynstrAppendMem(&combined_insert_query, MADB_Token(&Stmt->Query, 0), valuesOffest);
for (j = batchNum * rowsInBatch; j < numParamRows && j < batchNum * rowsInBatch + rowsInBatch; ++j)
{
MADB_DynString final_query;
MADB_InitDynamicString(&final_query, "", 1024, 1024);

const CspsControlFlowResult InitParamsRes
= CspsInitStatementParams(Stmt, &final_query, &ErrorCount, &ret, CurQuery, ParamOffset, j);
switch(InitParamsRes) {
case CCFR_OK:
break;
case CCFR_CONTINUE:
MADB_DynstrFree(&final_query);
continue;
case CCFR_ERROR:
MADB_DynstrFree(&final_query);
MADB_DynstrFree(&combined_insert_query);
goto end;
default:
assert(0);
break;
}
if (final_query.length > 0)
{
if (j % rowsInBatch != 0) MADB_DynstrAppend(&combined_insert_query, ",");
MADB_DynstrAppend(&combined_insert_query, final_query.str + valuesOffest);
}
MADB_DynstrFree(&final_query);
}
if (!combineQueries && !rewriteInsert)
MDBUG_C_PRINT(Stmt->Connection, "Running INSERT statement with %d rows", j - batchNum * rowsInBatch);
ret = CspsRunStatementQuery(Stmt, &combined_insert_query, &ErrorCount, ParamOffset);
if (Stmt->Ipd->Header.ArrayStatusPtr)
{
for (j = batchNum * rowsInBatch; j < numParamRows && j < batchNum * rowsInBatch + rowsInBatch; ++j)
{
if (!Stmt->Apd->Header.ArrayStatusPtr || Stmt->Apd->Header.ArrayStatusPtr[j] != SQL_PARAM_IGNORE)
{
Stmt->Ipd->Header.ArrayStatusPtr[j] = SQL_SUCCEEDED(ret) ? SQL_PARAM_SUCCESS : SQL_PARAM_ERROR;
}
}
}
MADB_DynstrFree(&combined_insert_query);
}
Stmt->AffectedRows = mysql_stmt_affected_rows(Stmt->stmt);
if (ret == SQL_ERROR)
{
ErrorCount = numParamRows;
}
}
else
{
for (j = 0; j < numParamRows; ++j)
{
MADB_DynString final_query;
MADB_InitDynamicString(&final_query, "", 1024, 1024);

const CspsControlFlowResult InitParamsRes
= CspsInitStatementParams(Stmt, &final_query, &ErrorCount, &ret, CurQuery, ParamOffset, j);
switch(InitParamsRes) {
case CCFR_OK:
break;
case CCFR_CONTINUE:
MADB_DynstrFree(&final_query);
continue;
case CCFR_ERROR:
MADB_DynstrFree(&final_query);
goto end;
default:
assert(0);
break;
}
ret = CspsRunStatementQuery(Stmt, &final_query, &ErrorCount, ParamOffset);

if (Stmt->Ipd->Header.ArrayStatusPtr)
Expand All @@ -2011,54 +2059,10 @@ SQLRETURN MADB_StmtExecute(MADB_Stmt *Stmt, BOOL ExecDirect)
SQL_PARAM_DIAG_UNAVAILABLE;
}
}
MADB_DynstrFree(&final_query);
}
else
{
if (rewriteInsert && final_query.length > 0)
{
if (j > 0) MADB_DynstrAppend(&combined_insert_query, ",");
MADB_DynstrAppend(&combined_insert_query, final_query.str + valuesOffest);
}
else if (combineQueries)
{
MADB_DynstrAppend(&all_params_query, final_query.str);
MADB_DynstrAppend(&all_params_query, ";");
}
}
MADB_DynstrFree(&final_query);
}
if (rewriteInsert)
{
MDBUG_C_PRINT(Stmt->Connection, "Running INSERT statement with %d rows", numParamRows);
ret = CspsRunStatementQuery(Stmt, &combined_insert_query, &ErrorCount, ParamOffset);
CspsReceiveStatementResults(Stmt, ret);
if (Stmt->Ipd->Header.ArrayStatusPtr)
{
for (j = 0; j < numParamRows; ++j)
{
if (!Stmt->Apd->Header.ArrayStatusPtr || Stmt->Apd->Header.ArrayStatusPtr[j] != SQL_PARAM_IGNORE)
{
Stmt->Ipd->Header.ArrayStatusPtr[j] = SQL_SUCCEEDED(ret) ? SQL_PARAM_SUCCESS : SQL_PARAM_ERROR;
}
}
}
if (ret == SQL_ERROR)
{
ErrorCount = numParamRows;
}
}
else if (combineQueries)
{
MDBUG_C_PRINT(Stmt->Connection, "Running combined query with %d entries", numParamRows);
ret = CspsRunStatementQuery(Stmt, &all_params_query, &ErrorCount, ParamOffset);
CspsReceiveStatementResultsMulti(Stmt, &ErrorCount, ret);
}
else
{
CspsReceiveStatementResults(Stmt, ret);
}
MADB_DynstrFree(&all_params_query);
MADB_DynstrFree(&combined_insert_query);

// Move forward to the next subquery in the multistatement.
if (QUERY_IS_MULTISTMT(Stmt->Query))
Expand Down

0 comments on commit f2eb11a

Please sign in to comment.