bingo/sqlserver/Source/RingoIndexData.cs (126 lines of code) (raw):

using System; using System.Collections.Generic; using System.Text; using System.Data; using System.Data.SqlClient; using System.Data.SqlTypes; namespace indigo { public class RingoIndexData : BingoIndexData { object _sync_object = new Object(); public RingoIndexData (BingoIndexID id, string id_column, string data_column, string bingo_schema) : base(id, id_column, data_column, bingo_schema) { } public override IndexType getIndexType () { return IndexType.Reaction; } public override void CreateTables (SqlConnection conn) { base.CreateTables(conn); StringBuilder cmd = new StringBuilder(); // Create shadow table cmd.AppendFormat(@"CREATE TABLE {0} (id int not null, storage_id int not null, crf varbinary(max), hash int not null)", shadowTable); BingoSqlUtils.ExecNonQuery(conn, cmd.ToString()); } public override void createIndices (SqlConnection conn) { BingoSqlUtils.ExecNonQuery(conn, "ALTER TABLE {0} ADD PRIMARY KEY (storage_id)", shadowTable); BingoSqlUtils.ExecNonQuery(conn, "CREATE UNIQUE INDEX id ON {0}(id)", shadowTable); BingoSqlUtils.ExecNonQuery(conn, "CREATE INDEX hash ON {0}(hash)", shadowTable); } public override void DropTables (SqlConnection conn) { base.DropTables(conn); BingoSqlUtils.ExecNonQueryNoThrow(conn, "DROP TABLE " + shadowTable); } DataTable shadow_datatable = null; public void addToShadowTable (SqlConnection conn, RingoIndex index, int id, int storage_id) { lock (_sync_object) { if (shadow_datatable == null) _createDataTable(); if (shadow_datatable.Rows.Count >= 10000) _flushShadowTable(conn); DataRow shadow_row = shadow_datatable.NewRow(); shadow_row["id"] = id; shadow_row["storage_id"] = storage_id; shadow_row["crf"] = index.crf; shadow_row["hash"] = index.hash; shadow_datatable.Rows.Add(shadow_row); } } private void _createDataTable () { shadow_datatable = new DataTable(); DataColumnCollection sc = shadow_datatable.Columns; sc.Add(new DataColumn("id", Type.GetType("System.Int32"))); sc.Add(new DataColumn("storage_id", Type.GetType("System.Int32"))); sc.Add(new DataColumn("crf", Type.GetType("System.Array"))); sc.Add(new DataColumn("hash", Type.GetType("System.Int32"))); } public override bool needFlush() { lock (_sync_object) { if (base.needFlush()) return true; return shadow_datatable != null && shadow_datatable.Rows.Count > 0; } } public override void flush(SqlConnection conn) { lock (_sync_object) { base.flush(conn); _flushShadowTable(conn); } } private void _flushShadowTable (SqlConnection conn) { if (shadow_datatable == null || shadow_datatable.Rows.Count == 0) return; if (conn.ConnectionString == "context connection=true") { // SqlBulkInsert cannot be used in the context connection _flushShadowTableInContext(conn); return; } BingoTimer timer = new BingoTimer("shadow_table.flush"); using (SqlTransaction transaction = conn.BeginTransaction()) { // Copy shadow table using (SqlBulkCopy bulkCopy = new SqlBulkCopy(conn, SqlBulkCopyOptions.TableLock, transaction)) { bulkCopy.DestinationTableName = shadowTable; foreach (DataColumn dc in shadow_datatable.Columns) bulkCopy.ColumnMappings.Add(dc.ColumnName, dc.ColumnName); bulkCopy.BatchSize = shadow_datatable.Rows.Count; bulkCopy.BulkCopyTimeout = 3600; bulkCopy.WriteToServer(shadow_datatable); } shadow_datatable.Rows.Clear(); transaction.Commit(); } timer.end(); } private void _flushShadowTableInContext (SqlConnection conn) { foreach (DataRow row in shadow_datatable.Rows) { using (SqlCommand cmd = new SqlCommand()) { cmd.CommandTimeout = 3600; StringBuilder cmd_text = new StringBuilder(); cmd_text.AppendFormat("INSERT INTO {0} VALUES ", shadowTable); cmd_text.AppendFormat("({0}, {1}, @crf, {2})", row["id"], row["storage_id"], row["hash"]); cmd.Parameters.AddWithValue("@crf", new SqlBinary((byte[])row["crf"])); cmd.Connection = conn; cmd.CommandText = cmd_text.ToString(); cmd.ExecuteNonQuery(); } } shadow_datatable.Rows.Clear(); } public override void prepareForDeleteRecord (SqlConnection conn) { lock (_sync_object) { _flushShadowTable(conn); } } } }