Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…nto main
  • Loading branch information
chenghuaWang committed Dec 21, 2022
2 parents 34c9fc5 + a2577d2 commit b8e04fb
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 3 deletions.
38 changes: 38 additions & 0 deletions scripts/libCB.lua
Original file line number Diff line number Diff line change
Expand Up @@ -215,5 +215,43 @@ function Cb.Op.CombineOp(baseOpPtr, primaryKeys, newTableName)
-- for c = 1, output:getShape()[1] do
-- print(c, output:colNameAt(c - 1))
-- end
end

function Cb.Op.FilterOp(baseOpPtr, judgeMethod, modifyMethod)
if #baseOpPtr.io.I ~= 1 then
print("[ CB engine Warning ] when execute Cb.Op.FilterOp. #inputs ~= 1");
return;
end
local inputs = baseOpPtr.io.I[1]; -- vector.
local output = baseOpPtr.io.O; -- virtual table.

if judgeMethod == nil then
print("[ CB engine Error ] when execute Cb.Op.FilterOp. judgeMethod = nil");
return;
end
if modifyMethod == nil then
print("[ CB engine Error ] when execute Cb.Op.FilterOp. modifyMethod = nil");
return;
end

local newRow = inputs:getShape()[0];
local newCol = inputs:getShape()[1];
local bufRow;

output:resetShapeH(Cb.F.makeShapeFull(newRow, newCol));
for r = 1, newRow do
bufRow = inputs:getRow(r - 1)
if judgeMethod(bufRow) then
bufRow = modifyMethod(bufRow)
end
for c = 1, output:getShape()[1] do
output:setPtrAt(r - 1, c - 1, bufRow:atPtr(0, c - 1));
end
end

for c = 1, output:getShape()[1] do
output:setInfoAt(c - 1, inputs:getInfoAt(c - 1));
end

Cb.F.setTableName(output, inputs:getInfoAt(0, 0):getTableName());
end
2 changes: 1 addition & 1 deletion scripts/utils/postScript2.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"id": 100, "script": "node_vd_1 = ThisGraph:createVirtualDeviceNode(0);\nnode_vd_2 = ThisGraph:createVirtualDeviceNode(1);\nnode_vd_1:addQuery(\"SELECT * FROM runoob_tbl;\");\nnode_vd_2:addQuery(\"SELECT * FROM myel;\");\n\nnode_vd_combine = ThisGraph:createCombineNode(Cb.F.PackedStringToVec(\"runoob_id\", \"runoob_id\"), \"NewTable\");\nnode_vd_1:PointTo(Cb.F.refNode(node_vd_combine));\nnode_vd_2:PointTo(Cb.F.refNode(node_vd_combine));\n\nThisGraph:addCacheServer(ThisGraph:createRedisCachingNode(0));\n"}
{"id": 100, "script": "function testJudgeMethod(rowOfTable)\n if rowOfTable:atPtr(0, 0):isInt() then\n if Cb.F.value(rowOfTable:atPtr(0, 0)) < 10 then\n return true;\n end\n end\n return false;\nend\n\nfunction testModifyMethod(rowOfTable)\n rowOfTable:setPtrAt(0, 0, ThisGraph:createKVCell(Cb.F.value(rowOfTable:atPtr(0, 0)) + 1));\n return rowOfTable;\nend\n\nnode_vd_1 = ThisGraph:createVirtualDeviceNode(0);\nnode_vd_2 = ThisGraph:createVirtualDeviceNode(1);\nnode_vd_1:addQuery(\"SELECT * FROM runoob_tbl;\");\nnode_vd_2:addQuery(\"SELECT * FROM myel;\");\n\nnode_vd_combine = ThisGraph:createCombineNode(Cb.F.PackedStringToVec(\"runoob_id\", \"runoob_id\"), \"NewTable\");\nnode_vd_1:PointTo(Cb.F.refNode(node_vd_combine));\nnode_vd_2:PointTo(Cb.F.refNode(node_vd_combine));\nnode_vd_filter = ThisGraph:createFilterNode(testJudgeMethod, testModifyMethod);\nnode_vd_combine:PointTo(Cb.F.refNode(node_vd_filter))\n\nThisGraph:addCacheServer(ThisGraph:createRedisCachingNode(0));\n"}
16 changes: 16 additions & 0 deletions scripts/utils/testBinding.lua
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
function testJudgeMethod(rowOfTable)
if rowOfTable:atPtr(0, 0):isInt() then
if Cb.F.value(rowOfTable:atPtr(0, 0)) < 10 then
return true;
end
end
return false;
end

function testModifyMethod(rowOfTable)
rowOfTable:setPtrAt(0, 0, ThisGraph:createKVCell(Cb.F.value(rowOfTable:atPtr(0, 0)) + 1));
return rowOfTable;
end

node_vd_1 = ThisGraph:createVirtualDeviceNode(0);
node_vd_2 = ThisGraph:createVirtualDeviceNode(1);
node_vd_1:addQuery("SELECT * FROM runoob_tbl;");
Expand All @@ -6,5 +20,7 @@ node_vd_2:addQuery("SELECT * FROM myel;");
node_vd_combine = ThisGraph:createCombineNode(Cb.F.PackedStringToVec("runoob_id", "runoob_id"), "NewTable");
node_vd_1:PointTo(Cb.F.refNode(node_vd_combine));
node_vd_2:PointTo(Cb.F.refNode(node_vd_combine));
node_vd_filter = ThisGraph:createFilterNode(testJudgeMethod, testModifyMethod);
node_vd_combine:PointTo(Cb.F.refNode(node_vd_filter))

ThisGraph:addCacheServer(ThisGraph:createRedisCachingNode(0));
17 changes: 15 additions & 2 deletions src/core/server/task/cbComputeGraph.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#include "cbComputeGraph.hpp"
#include "task/cbOperator.hpp"
#include "task/cbTable.hpp"
#include "trivial/cbVirtualDevice.hpp"

namespace cb {
Expand Down Expand Up @@ -246,7 +248,9 @@ cbComputeGraph::cbComputeGraph(int32_t idx)

"KVField",

"setTableName", &cbMySQLField::setTable
"setTableName", &cbMySQLField::setTable,

"getTableName", &cbMySQLField::getTable

);

Expand Down Expand Up @@ -342,6 +346,15 @@ cbComputeGraph::cbComputeGraph(int32_t idx)

);

// bind FilterOp
covalentBound.new_usertype<cbOpFilter>(

"cbOpFilter",

"overrideFunc", &cbOpFilter::overload

);

// bind cbVirtualDeviceNode
covalentBound.new_usertype<cbVirtualDeviceNode>(

Expand Down Expand Up @@ -474,7 +487,7 @@ cbOperatorNode* cbComputeGraph::createFilterNode(const sol::function& boolF,
const sol::function& exF) {
cbOpFilter* ansOp = new cbOpFilter(boolF, exF);

ansOp->overload(this->m_sharedLuaStack->get()()["Cb"]["Op"]["Filter"]);
ansOp->overload(this->m_sharedLuaStack->get()()["Cb"]["Op"]["FilterOp"]);
cbOperatorNode* ans = new cbOperatorNode(ansOp);
this->registerNode(ans);
return ans;
Expand Down

0 comments on commit b8e04fb

Please sign in to comment.