数据处理如同流水——介绍下偶的数据流引擎Samsara
前言
代碼、源碼不重要,重要的是思想,希望大家多給建議。
?
正文
微軟有個叫SSIS,引用了數(shù)據(jù)流概念,不過更加強大的是,他基于了sql server,能夠進行數(shù)據(jù)分析,構造數(shù)據(jù)倉庫。
數(shù)據(jù)挖掘的目標的確遠了,不過數(shù)據(jù)引擎我導開發(fā)了一個。
先看個demo。
?
需求:
我有個訂單表POS_SALESORDER,
需要生成一張訂單的消費憑證:POS_SALESORDERRECEIPT,
其中憑證的一些數(shù)據(jù)來源于我的顧客表:USR_PROFILE
傳統(tǒng)的c#代碼:
(取得POS_SALESORDERRECEIPT表,查詢客戶數(shù)據(jù)USR_PROFILE,然后再結合POS_SALESORDER生成憑證)
?
?
DataTable?receipt?=?NoebeManager.Instance.GetEntity("POS_SALESORDERRECEIPT");INoebeCommand?command?=?NoebeManager.Instance.NoebeCommand;
command.SQL?=?"SELECT?*?FROM?USR_PROFILE?WHERE?USERCODE?=?:USERCODE";
command.Parameters.Add("USERCODE",?row["USERCODE"].ToString());
DataTable?usrtb?=?command.ExecuteReader();
DataRow?userrow?=?null;
if?(usrtb.Rows.Count?==?0)
????userrow?=?null;
else
????userrow?=?usrtb.Rows[0];
double?staffcommission?=?0;
double?commission?=?0;
if?(userrow?==?null)
{
????staffcommission?=?CitiboxGlobalStringHelper.default_staffcommission;
}
else
{
????staffcommission?=?double.Parse(userrow["STAFFCOMMISSION"].ToString());
????commission?=?double.Parse(userrow["COMMISSION"].ToString());
}
DataRow?receiptrow?=?receipt.NewRow();
//receiptrow["ID"]?=
receiptrow["ORDERBILLCODE"]?=?row["BILLCODE"];
receiptrow["RECEIPTCODE"]?=?CitiboxGlobalPkHelper.Instance.GetBillPosOrderReceiptPk();
receiptrow["SHOPCODE"]?=?row["SHOPCODE"];
receiptrow["SHOPNAME"]?=?row["SHOPNAME"];
receiptrow["MERCHANTCODE"]?=?row["USERCODE"];
receiptrow["MERCHANTNAME"]?=?row["USERNAME"];
receiptrow["CREATEDATE"]?=?Pixysoft.Tools.GlobalTimer.Instance.GetGlobalTime();
receiptrow["MODIDATE"]?=?Pixysoft.Tools.GlobalTimer.Instance.GetGlobalTime();
receiptrow["ORDERTEMPLATECODE"]?=?row["TEMPLATECODE"];
receiptrow["ORDERTEMPLATENAME"]?=?row["TEMPLATENAME"];
receiptrow["DEPOSITPRICE"]?=?row["DEPOSITPRICE"];
receiptrow["ITEMPRICE"]?=?row["ITEMPRICE"];
receiptrow["REALPRICE"]?=?row["ITEMPRICE"];
receiptrow["STATUS"]?=?(int)BillIntStatus.New;
receiptrow["REMARK"]?=?"訂單成功";
receiptrow["COMMISSION"]?=?commission;
receiptrow["STAFFCOMMISSION"]?=?staffcommission;
receipt.Rows.Add(receiptrow);
CstNoebeManager.Instance.ClientManager.Session.AutoInsert(receipt);
?
如果用數(shù)據(jù)流引擎:
?
?
?
IDataflow?dataflow?=?SamsaraManager.Instance.Dataflow;IInput?input?=?dataflow.GetInput();
input.Add(row);
input.Add("@DEFAULTSTAFFCOMMISSION",?CitiboxGlobalStringHelper.default_staffcommission);
input.Add("@STATUS",?(int)BillIntStatus.New);
input.Add("@RECEIPTCODE",?CitiboxGlobalPkHelper.Instance.GetBillPosOrderReceiptPk());
dataflow.Initialize(input);
IExchanger?exchanger?=?dataflow.GetExchanger("POS_SALESORDERRECEIPT");
exchanger.AddScript("ORDERBILLCODE?=?POS_SALESORDER.BILLCODE");
exchanger.AddScript("RECEIPTCODE?=?@RECEIPTCODE");
exchanger.AddScript("SHOPCODE?=?POS_SALESORDER.SHOPCODE");
exchanger.AddScript("SHOPNAME?=?POS_SALESORDER.SHOPNAME");
exchanger.AddScript("MERCHANTCODE?=?POS_SALESORDER.USERCODE");
exchanger.AddScript("MERCHANTNAME?=?POS_SALESORDER.USERNAME");
exchanger.AddScript("CREATEDATE?=?SYS.DATETIME");
exchanger.AddScript("MODIDATE?=?SYS.DATETIME");
exchanger.AddScript("ORDERTEMPLATECODE?=?POS_SALESORDER.TEMPLATECODE");
exchanger.AddScript("ORDERTEMPLATENAME?=?POS_SALESORDER.TEMPLATENAME");
exchanger.AddScript("DEPOSITPRICE?=?POS_SALESORDER.DEPOSITPRICE");
exchanger.AddScript("ITEMPRICE?=?POS_SALESORDER.ITEMPRICE");
exchanger.AddScript("REALPRICE?=?POS_SALESORDER.ITEMPRICE");
exchanger.AddScript("STATUS?=?@STATUS");
exchanger.AddScript("REMARK?=?'訂單成功'");
dataflow.Runflow(exchanger);
ILoader?loader?=?dataflow.GetLoader("USR_PROFILE");
loader.Sql?=?"SELECT?STAFFCOMMISSION,COMMISSION??FROM?USR_PROFILE?WHERE?USERCODE?=?:USERCODE";
loader.AddScript("USERCODE?=?POS_SALESORDER.USERCODE");
dataflow.Runflow(loader);
if?(loader.Succeed.IsAlive)
{
????IExchanger?subexchanger?=?dataflow.GetExchanger("POS_SALESORDERRECEIPT");
????subexchanger.AddScript("COMMISSION?=?USR_PROFILE.COMMISSION");
????subexchanger.AddScript("STAFFCOMMISSION?=?USR_PROFILE.STAFFCOMMISSION");
????dataflow.Runflow(subexchanger);
}
else
{
????IExchanger?subexchanger?=?dataflow.GetExchanger("POS_SALESORDERRECEIPT");
????subexchanger.AddScript("COMMISSION?=?0");
????subexchanger.AddScript(ScriptType.Number,?"STAFFCOMMISSION?=?@DEFAULTSTAFFCOMMISSION");
????dataflow.Runflow(subexchanger);
}
IOutput?output?=?dataflow.GetOutput();
DataTable?receipttb?=?output.GetInsertTable("POS_SALESORDERRECEIPT");
CstNoebeManager.Instance.ClientManager.Session.AutoInsert(receipttb);
?
?
似乎代碼沒有什么節(jié)省。不過,如果我的生成的表數(shù)據(jù)非常復雜,比如:多個表的四則運算、函數(shù)運算,那么傳統(tǒng)就需要寫一大堆的小方法,算好了,再傳遞給字段。
這個時候,數(shù)據(jù)流引擎就發(fā)揮作用了,所有的函數(shù)運算僅需要寫好表達式,自動計算。
數(shù)據(jù)流模塊
IExchanger 就是上文的數(shù)據(jù)交換
ILoader 讀取數(shù)據(jù)庫裝載數(shù)據(jù)
Ifer 條件判斷,例如當訂單價格ITEMPRICE>30的時候,xxx
ISwitcher 值判斷,例如根據(jù)訂單客戶類型MERCHANTTYPECODE,進行不同的處理
IMapper 字段值映射,例如把某個占位符映射成一個具體的值,@STATUS = 1
Injector 數(shù)據(jù)中途注入,除了數(shù)據(jù)庫裝載,可以在中途注入新的數(shù)據(jù),再次運算。
Isorter 流排序,如果裝載了新的數(shù)據(jù),和舊的對不上,那么通過排序能夠重新接上(例如先后裝載表A,表B,但是大家對不上好,那么我根據(jù)條件表A.Merchantcode = 表B.merchantcode排序之后,就對上了)
最后還有個Foreach功能,和MergeForeach,把數(shù)據(jù)流分開處理后,合并。
?
一個復雜的數(shù)據(jù)流處理案例(samsara可以做的更多!):
?
?
SalesClosingReceipt?closingreceipt?=?new?SalesClosingReceipt();closingreceipt.Merchantcode?=?webClosingRow["MERCHANTCODE"].ToString();
closingreceipt.Merchantname?=?webClosingRow["MERCHANTNAME"].ToString();
//取得本地結算表
string?pk?=?CitiboxGlobalPkHelper.Instance.GetBillSaleClosingPk();
Info("get?primary?key?for?balance?bill.?pk?=?"?+?pk);
IDataflow?dataflow?=?SamsaraManager.Instance.Dataflow;
IInput?input?=?dataflow.GetInput();
input.Add(webClosingRow);
input.Add("@BILL_PRIMARYKEY",?pk);
input.Add("@DEFAULT_USRBOXCODE",?CitiboxGlobalStringHelper.default_usrboxcode);
dataflow.Initialize(input);
//取得網站結算單
Info("get?web_salesclosing?detail.");
ILoader?loader?=?dataflow.GetLoader("WEB_SALESCLOSINGDETAIL");
loader.Sql?=?"SELECT?*?FROM?WEB_SALESCLOSINGDETAIL?WHERE?BILLCODE?=?:BILLCODE";
loader.AddScript("BILLCODE?=?WEB_SALESCLOSING.BILLCODE");
dataflow.Runflow(loader);
//生成本地結算單
foreach?(IDataflow?subflow?in?dataflow.Foreach("WEB_SALESCLOSINGDETAIL"))
{
????Ifer?ifflow?=?subflow.If("WEB_SALESCLOSINGDETAIL.USRBOXCODE?==?@DEFAULT_USRBOXCODE");
????IDataflow?iftrueflow?=?ifflow.True;
????bool?hasreceipt?=?false;
????if?(iftrueflow.IsAlive)
????{
????????hasreceipt?=?GetCurrentNonReceiptTable(iftrueflow).Succeed.IsAlive;
????}
????IDataflow?iffalseflow?=?ifflow.False;
????if?(iffalseflow.IsAlive)
????{
????????ILoader?usrboxloader?=?UsrboxIsUnavailable(subflow);
????????if?(!usrboxloader.Succeed.IsAlive)
????????{
continue;
????????}
????????else
????????{
hasreceipt?=?GetCurrentReceiptTable(iffalseflow).Succeed.IsAlive;
????????}
????}
????Info("create?BIL_SALESCLOSINGDETAIL");
????if?(hasreceipt)
????{
????????IExchanger?exchangerflow?=?subflow.GetExchanger("BIL_SALESCLOSINGDETAIL");
????????exchangerflow.AddScript("BILLCODE?=?@BILL_PRIMARYKEY");
????????exchangerflow.AddScript(ScriptType.Number,?"CLOSINGPRICE?=?SUM(?POS_ITEMRECEIPT.SALEPRICE?*?POS_ITEMRECEIPT.SALEQTY?)");
????????exchangerflow.AddScript(ScriptType.Number,?"CLOSINGCOMMISSION?=?SUM(?POS_ITEMRECEIPT.SALEPRICE?*?POS_ITEMRECEIPT.SALEQTY?*?POS_ITEMRECEIPT.COMMISSION?)");
????????exchangerflow.AddScript(ScriptType.Number,?"CLOSINGSTAFFCOMMISSION?=?SUM(?POS_ITEMRECEIPT.SALEPRICE?*?POS_ITEMRECEIPT.SALEQTY?*?POS_ITEMRECEIPT.STAFFCOMMISSION?)");
????????exchangerflow.AddScript("CLOSINGDATEFROM?=?WEB_SALESCLOSINGDETAIL.CLOSINGDATEFROM");
????????exchangerflow.AddScript("CLOSINGDATETO?=?WEB_SALESCLOSINGDETAIL.CLOSINGDATETO");
????????exchangerflow.AddScript("CLOSINGDATE?=?WEB_SALESCLOSINGDETAIL.CLOSINGDATE");
????????exchangerflow.AddScript("USRBOXCODE?=?WEB_SALESCLOSINGDETAIL.USRBOXCODE");
????????subflow.Runflow(exchangerflow);
????}
????else
????{
????????IExchanger?exchangerflow?=?subflow.GetExchanger("BIL_SALESCLOSINGDETAIL");
????????exchangerflow.AddScript("BILLCODE?=?@BILL_PRIMARYKEY");
????????exchangerflow.AddScript(ScriptType.Number,?"CLOSINGPRICE?=?0");
????????exchangerflow.AddScript(ScriptType.Number,?"CLOSINGCOMMISSION?=?0");
????????exchangerflow.AddScript(ScriptType.Number,?"CLOSINGSTAFFCOMMISSION?=?0");
????????exchangerflow.AddScript("CLOSINGDATEFROM?=?WEB_SALESCLOSINGDETAIL.CLOSINGDATEFROM");
????????exchangerflow.AddScript("CLOSINGDATETO?=?WEB_SALESCLOSINGDETAIL.CLOSINGDATETO");
????????exchangerflow.AddScript("CLOSINGDATE?=?WEB_SALESCLOSINGDETAIL.CLOSINGDATE");
????????exchangerflow.AddScript("USRBOXCODE?=?WEB_SALESCLOSINGDETAIL.USRBOXCODE");
????????subflow.Runflow(exchangerflow);
????}
????ILoader?blsloader?=?GetBalanceControlTable(subflow);
????IDataflow?blstrueflow?=?blsloader.Succeed;
????if?(blstrueflow.IsAlive)
????{
????????IExchanger?blsexchanger?=?blstrueflow.GetExchanger("BLS_COMMODITYACCOUNTCONTROL");
????????blsexchanger.AddScript("CONTROLDATE?=?WEB_SALESCLOSINGDETAIL.CLOSINGDATE");
????????blsexchanger.AddScript("MODIDATE?=?SYS.DATETIME");
????????blsexchanger.AddScript("LASTCLOSINGPRICE?=?BIL_SALESCLOSINGDETAIL.CLOSINGPRICE");
????????blsexchanger.AddScript("LASTCLOSINGCOMMISSION?=?BIL_SALESCLOSINGDETAIL.CLOSINGCOMMISSION");
????????blsexchanger.AddScript("LASTCLOSINGSTAFFCOMMISSION?=?BIL_SALESCLOSINGDETAIL.CLOSINGSTAFFCOMMISSION");
????????blstrueflow.Runflow(blsexchanger);
????????IExchanger?webexchanger?=?blstrueflow.GetExchanger("WEB_SALESCLOSINGDETAIL");
????????webexchanger.AddScript("REALCLOSINGPRICE?=?BIL_SALESCLOSINGDETAIL.CLOSINGPRICE");
????????webexchanger.AddScript("REALCLOSINGCOMMISSION?=?BIL_SALESCLOSINGDETAIL.CLOSINGCOMMISSION");
????????webexchanger.AddScript("REALCLOSINGSTAFFCOMMISSION?=?BIL_SALESCLOSINGDETAIL.CLOSINGSTAFFCOMMISSION");
????????webexchanger.AddScript("REALCLOSINGPRICE?=?BIL_SALESCLOSINGDETAIL.CLOSINGPRICE");
????????blstrueflow.Runflow(webexchanger);
????}
????IDataflow?blsfalseflow?=?blsloader.Failed;
????if?(blsfalseflow.IsAlive)
????{
????????Error(string.Format("missing?bls_commodityaccountcontrol.?user?validation?fail.?merchantcode?=?{0}",
webClosingRow["MERCHANTCODE"].ToString()));
????????return?null;
????}
????IDataflow?absreceiptflow?=?subflow.If("POS_ITEMRECEIPT.STATUS?==?"?+?BillStringStatus.Abnomity).True;
????{
????????if?(absreceiptflow.IsAlive)
????????{
IExchanger?absexchanger?=?absreceiptflow.GetExchanger("POS_ITEMRECEIPT");
absexchanger.AddScript("STATUS?=?"?+?BillStringStatus.New);
absexchanger.AddScript("CREATEDATE?=?SYS.DATETIME");
absreceiptflow.Runflow(absexchanger);
????????}
????}
????DataTable?closingdetailtb?=?subflow.Peekflow("WEB_SALESCLOSINGDETAIL");
????DataTable?receipttb?=?subflow.Peekflow("POS_ITEMRECEIPT");
????if?(closingdetailtb.Rows.Count?==?0)
????????continue;
????DataRow?closingdetailrow?=?closingdetailtb.Rows[0];
????SalesClosingItem?closingitem?=?new?SalesClosingItem();
????closingitem.Boxlocationcode?=?closingdetailrow["BOXLOCATIONCODE"].ToString();
????closingitem.Datefrom?=?closingdetailrow["CLOSINGDATEFROM"].ToString();
????closingitem.Dateto?=?closingdetailrow["CLOSINGDATETO"].ToString();
????closingitem.Price?=?(double)closingdetailrow["REALCLOSINGPRICE"];
????closingitem.Commission?=?(double)closingdetailrow["REALCLOSINGCOMMISSION"];
????closingitem.Receipttb?=?receipttb;
????closingreceipt.Items.Add(closingitem);
}
dataflow.MergeForeach();
Info("begin?change?BIL_SALESCLOSING");
IExchanger?mainbillexchanger?=?dataflow.GetExchanger("BIL_SALESCLOSING");
mainbillexchanger.AddScript("BILLCODE?=?@BILL_PRIMARYKEY");
mainbillexchanger.AddScript("MERCHANTCODE?=?WEB_SALESCLOSING.MERCHANTCODE");
mainbillexchanger.AddScript("CREATEDATE?=?SYS.DATETIME");
mainbillexchanger.AddScript("MODIDATE?=?SYS.DATETIME");
mainbillexchanger.AddScript(ScriptType.Number,?"CLOSINTTOTALPRICE?=?SUM?(BIL_SALESCLOSINGDETAIL.CLOSINGPRICE)");
mainbillexchanger.AddScript(ScriptType.Number,?"CLOSINTTOTALCOMMISSION?=?SUM?(BIL_SALESCLOSINGDETAIL.CLOSINGCOMMISSION)");
mainbillexchanger.AddScript(ScriptType.Number,?"CLOSINGTOTALSTAFFCOMMISSION?=?SUM?(BIL_SALESCLOSINGDETAIL.CLOSINGSTAFFCOMMISSION)");
dataflow.Runflow(mainbillexchanger);
Info("begin?change?web_salesclosing?status?to?pass.");
IExchanger?mainwebexchanger?=?dataflow.GetExchanger("WEB_SALESCLOSING");
mainwebexchanger.AddScript(ScriptType.Number,?"STATUS?=?"?+?(int)BillIntStatus.Pass);
dataflow.Runflow(mainwebexchanger);
IOutput?output?=?dataflow.GetOutput();
?
后記
?
代碼亂了。
說下samsara,是佛教中輪回的意思。
第一階段:
當時是5年前,開發(fā)一個信息系統(tǒng),被客戶搞煩了,整天要修改表結構,因此我想出了一個用腳本去運算表的思路。成為第一代samsara。
當時剛剛接觸c#,xml之類的,因此所有的配置用xml,samsara讀取xml之后直接運算。
事實上發(fā)現(xiàn)了,xml根本不是給人看的,維護起來太麻煩了。而且把企業(yè)的業(yè)務邏輯綁在xml,debug的時候不知道為什么會有異常。
第二階段:
因此畢業(yè)的時候,開發(fā)了samsara 第二代。把腳本簡化,使用人讀的語言,而不是xml。
能夠減少一些開發(fā)難度。但是企業(yè)業(yè)務邏輯還是綁定在xml,維護非常不方便。
第三階段:
之后工作了,一直沒有時間用samsara,自己也沒有信心,所以在后來系統(tǒng)里面簡單調用了一下之后就荒廢了。
現(xiàn)在正好工作沒了,有一段空閑的時間,讓我好好根據(jù)這幾年的積累重新修改。
于是提出了腳本與代碼結合的方式,成為了現(xiàn)在的samsara第三代。
他的特點是,業(yè)務的邏輯由代碼完成,我的samsara盡量的接近c#的一些邏輯處理。然后一些復雜的數(shù)據(jù)運算交給腳本完成。
我個人認為,第三代samsara可以商業(yè)化了。接下來,第四代samsara完全可以開發(fā)數(shù)據(jù)倉庫了。
或者可以考慮把對象運算加入,成為對象流引擎。我稱為
samsara 第四代!
?
?
?
轉載于:https://www.cnblogs.com/zc22/archive/2009/10/21/1587198.html
總結
以上是生活随笔為你收集整理的数据处理如同流水——介绍下偶的数据流引擎Samsara的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: NB-IoT技术实战开发 ----- N
- 下一篇: 落尘曦的书籍分享-数据库相关书籍