1:MSSQL
SQL語法篇:
BULK INSERT
[ database_name . [ schema_name ] . | schema_name . ] [ table_name | view_name ]
FROM 'data_file'
[ WITH
(
[ [ , ] BATCHSIZE = batch_size ]
[ [ , ] CHECK_CONSTRAINTS ]
[ [ , ] CODEPAGE = { 'ACP' | 'OEM' | 'RAW' | 'code_page' } ]
[ [ , ] DATAFILETYPE =
{ 'char' | 'native'| 'widechar' | 'widenative' } ]
[ [ , ] FIELDTERMINATOR = 'field_terminator' ]
[ [ , ] FIRSTROW = first_row ]
[ [ , ] FIRE_TRIGGERS ]
[ [ , ] FORMATFILE = 'format_file_path' ]
[ [ , ] KEEPIDENTITY ]
[ [ , ] KEEPNULLS ]
[ [ , ] KILOBYTES_PER_BATCH = kilobytes_per_batch ]
[ [ , ] LASTROW = last_row ]
[ [ , ] MAXERRORS = max_errors ]
[ [ , ] ORDER ( { column [ ASC | DESC ] } [ ,...n ] ) ]
[ [ , ] ROWS_PER_BATCH = rows_per_batch ]
[ [ , ] ROWTERMINATOR = 'row_terminator' ]
[ [ , ] TABLOCK ]
[ [ , ] ERRORFILE = 'file_name' ]
)]
SQL示例:
bulk insert 表名 from 'D:\mydata.txt'
with
(fieldterminator=',',
rowterminator='\n',
check_constraints)
select * from 表名
由于C#提供了SqlBulkCopy,所以非DBA的我們,更多會通過程序來調用:
C#代碼篇:
C#代碼調用示例及細節,以下代碼摘錄自CYQ.Data:
using (SqlBulkCopy sbc = new SqlBulkCopy(con, (keepID ? SqlBulkCopyOptions.KeepIdentity : SqlBulkCopyOptions.Default) |SqlBulkCopyOptions.FireTriggers, sqlTran))
{
sbc.BatchSize= 100000;
sbc.DestinationTableName=SqlFormat.Keyword(mdt.TableName, DalType.MsSql);
sbc.BulkCopyTimeout=AppConfig.DB.CommandTimeout;foreach (MCellStruct column inmdt.Columns)
{
sbc.ColumnMappings.Add(column.ColumnName, column.ColumnName);
}
sbc.WriteToServer(mdt);
}
有5個細節:
1:事務:
如果只是單個事務,構造函數可以是鏈接字符串。
如果需要和外部合成一個事務(比如先刪除,再插入,這在同一個事務中)
就需要自己構造Connection對象和Transaction,在上下文中傳遞來處理。
2:插入是否引發觸發器
通過SqlBulkCopyOptions.FireTriggers 引入
3:其它:批量數、超時時間、是否寫入主鍵ID。
可能引發的數據庫Down機的情況:
在歷史的過程中,我遇到過的一個大坑是:
當數據的長度過長,數據的字段過短,產生數據二進制截斷時,數據庫服務竟然停掉了(也許是特例,也許不是)。
所以小心使用,盡力做好對外部數據做好數據長度驗證。
2:MySql
關于MySql的批量,這是一段悲催的往事,有幾個坑,直到今天,才發現并解決了。
SQL語法篇:
LOAD DATA [LOW_PRIORITY | CONCURRENT] [LOCAL] INFILE 'data.txt'
[REPLACE | IGNORE]
INTO TABLE tbl_name
[FIELDS
[TERMINATED BY 'string']
[[OPTIONALLY] ENCLOSED BY 'char']
[ESCAPED BY 'char' ]
]
[LINES
[STARTING BY 'string']
[TERMINATED BY 'string']
]
[IGNORE number LINES]
[(col_name_or_user_var,...)]
[SET col_name = expr,...)]
示例篇:
LOAD DATA LOCAL INFILE 'C:\\Users\\cyq\\AppData\\Local\\Temp\\BulkCopy.csv' INTO TABLE `BulkCopy` CHARACTER SET utf8 FIELDS TERMINATED BY '$,$' LINES TERMINATED BY '
' (`ID`,`Name`,`CreateTime`,`Sex`)
雖然MySql.Data.dll 提供了MySqlBulkLoader,但是看源碼只是生成了個Load Data 并用ADO.NET執行,
核心大坑的生成*.csv數據文件的竟然沒提供,所以自己生成語句并執行就好了,不需要用它。
C#代碼篇:
以下代碼摘自CYQ.Data,是一段今天才修正好的代碼:
private static string MDataTableToFile(MDataTable dt, boolkeepID, DalType dalType)
{string path = Path.GetTempPath() + dt.TableName + ".csv";using (StreamWriter sw = new StreamWriter(path, false, new UTF8Encoding(false)))
{
MCellStruct ms;stringvalue;foreach (MDataRow row indt.Rows)
{for (int i = 0; i < dt.Columns.Count; i++)
{#region 設置值ms=dt.Columns[i];if (!keepID &&ms.IsAutoIncrement)
{continue;
}else if (dalType == DalType.MySql &&row[i].IsNull)
{
sw.Write("\\N");//Mysql用\N表示null值。
}else{
value=row[i].ToString();if (ms.SqlType ==SqlDbType.Bit)
{int v = (value.ToLower() == "true" || value == "1") ? 1 : 0;if (dalType ==DalType.MySql)
{byte[] b = new byte[1];
b[0] = (byte)v;
value= System.Text.Encoding.UTF8.GetString(b);//mysql必須用字節存檔。
}else{
value=v.ToString();
}
}else{
value= value.Replace("\\", "\\\\");//處理轉義符號
}
sw.Write(value);
}if (i != dt.Columns.Count - 1)//不是最后一個就輸出
{
sw.Write(AppConst.SplitChar);
}#endregion}
sw.WriteLine();
}
}if (Path.DirectorySeparatorChar == '\\')
{
path= path.Replace(@"\", @"\\");
}returnpath;
}
以上代碼是產生一個csv文件,用于被調用,有兩個核心的坑,費了我不少時間:
1:Bit類型數據導不進去?
2:第1行數據自增ID被重置為1?
這兩個問題,網上搜不到答案,放縱到今天,覺的應該解決了,然后就把它解決了。
解決的思路是這樣的:
A:先用Load Data OutFile導出一個文件,再用Load Data InFile導入文件。
一開始我用記事本打開看了一下,又順手Ctrl+S了一下,結果發現問題和我的一樣,讓我懷疑竟然不支持?
直到今天,重新導出,中間不看了,直接導入,發現它竟然又正常的,于是,思維一轉:
B:把自己生成的文件和命令產生的文件,進行了十六進制比對,結果發現:
Bit類型自己生成的的數據:是0,1,在十六進制下顯示是30、31。
命令產生的數據在十六進制是00、01,查了下資料,發現MySql的Bit存檔的Bit是二進制。
于是,把0,1用字節表示,再轉字符串,再存檔,就好了。
于是這么一段代碼產生了(網上的DataTable轉CSV代碼都是沒處理的,都不知道他們是怎么跑的,難道都沒有定義Bit類型?):
if (ms.SqlType == SqlDbType.Bit)
{
int v = (value.ToLower() == "true" || value == "1") ? 1 : 0;
if (dalType == DalType.MySql)
{
byte[] b = new byte[1];
b[0] = (byte)v;
value = System.Text.Encoding.UTF8.GetString(b);//mysql必須用字節存檔。
}
else
{
value = v.ToString();
}
}
另外關于Null值,用\N表示。
解決完第一個問題,剩下就是第二個問題了,為什么第一個行代碼的主鍵會被置為1?
還是比對十六進制,結果驚人的發現:
是BOM頭,讓它錯識別了第一個主鍵值,所以被忽略主鍵,用了第1個自增值1替代了。
這也解釋了為什么只要重新保存的數據都有Bug的原因。
于是,解決的方法就是StreaWrite的時候,不生成BOM頭,怎么處理呢?
于是就有了以下的代碼:
using (StreamWriter sw = new StreamWriter(path, false, new UTF8Encoding(false)))
{
...................
}
通過New一個Encoding,并指定參數為false,替代我們常規的System.Text.Encoding.UTF8Encoding。
這些細節很隱秘,不說你都猜不道。。。
3:Oracle
SQL語法篇
LOAD[DATA]
[ { INFILE | INDDN } {file | * }
[STREAM | RECORD | FIXED length [BLOCKSIZE size]|
VARIABLE [length] ]
[ { BADFILE | BADDN } file ]
{DISCARDS | DISCARDMAX} integr ]
[ {INDDN | INFILE} . . . ]
[ APPEND | REPLACE | INSERT ]
[RECLENT integer]
[ { CONCATENATE integer |
CONTINUEIF { [THIS | NEXT] (start[: end])LAST }
Operator { 'string' | X 'hex' } } ]
INTO TABLE [user.]table
[APPEND | REPLACE|INSERT]
[WHEN condition [AND condition]...]
[FIELDS [delimiter] ]
(
column {
RECNUM | CONSTANT value |
SEQUENCE ( { integer | MAX |COUNT} [, increment] ) |
[POSITION ( { start [end] | * [ + integer] }
) ]
datatype
[TERMINATED [ BY ] {WHITESPACE| [X] 'character' } ]
[ [OPTIONALLY] ENCLOSE[BY] [X]'charcter']
[NULLIF condition ]
[DEFAULTIF condotion]
}
[ ,...]
)
以上配置存檔成一個CTL文件,再由以下的命令調用:
Sqlldr userid=用戶名/密碼@數據庫 control=文件名.ctl
C#語法篇:
.NET里大概有三種操作Oracle的手法:
1:System.Data.OracleClient (需要安裝客戶端)沒有帶批量方法(還區分x86和x64)。
2:Oracle.DataAccess ?(需要安裝客戶端)帶批量方法(也區分x86和x64)。
3:Oracle.ManagedDataAccess (不需要安裝客戶端)沒帶批量方法(不區分x86和x64,但僅支持.NET 4.0或以上)
Oracle.DataAccess 帶的批量方法叫:OracleBulkCopy,由于使用方式和SqlBulkCopy幾乎一致,就不介紹了。
如果調用程序所在的服務器安裝了Oracle客戶端,可以進行以下方法的調用:
流程如下:
1:產生*.cvs數據文件,見MySql中的代碼,一樣用的。
2:產生*.ctl控制文件,把生成的Load Data 語句存檔成一個*.ctl文件即可。
3:用sqlidr.exe執行CTL文件,這里悲催的一點是,不能用ADO.NET調用,只能用進程調用,所以,這個批量只能單獨使用。
調用進程的相關代碼:
bool hasSqlLoader = false;private boolHasSqlLoader() //檢測是否安裝了客戶端。
{
hasSqlLoader= false;
Process proc= newProcess();
proc.StartInfo.FileName= "sqlldr";
proc.StartInfo.CreateNoWindow= true;
proc.StartInfo.UseShellExecute= false;
proc.StartInfo.RedirectStandardOutput= true;
proc.OutputDataReceived+= newDataReceivedEventHandler(proc_OutputDataReceived);
proc.Start();
proc.BeginOutputReadLine();
proc.WaitForExit();returnhasSqlLoader;
}void proc_OutputDataReceived(objectsender, DataReceivedEventArgs e)
{if (!hasSqlLoader)
{
hasSqlLoader= e.Data.StartsWith("SQL*Loader:");
}
}//已經實現,但沒有事務,所以暫時先不引入。
private bool ExeSqlLoader(stringarg)
{try{
Process proc= newProcess();
proc.StartInfo.FileName= "sqlldr";
proc.StartInfo.Arguments=arg;
proc.Start();
proc.WaitForExit();return true;
}catch{
}return false;
}
總結:
隨著大數據的普及,數據間的批量移動必然越來頻繁的被涉及,所以不管是用SQL腳本,還是自己寫代碼,或是用DBImport工具,都將成必備技能之一了!
鑒于此,分享一下我在這一塊費過的力和填過的坑,供大伙參考!