bingo/sqlserver/Source/BingoStorage.cs (422 lines of code) (raw):

using System.IO; using System; using System.Collections.Generic; using System.Collections; using System.Text; using System.Data; using System.Data.SqlClient; using System.Data.SqlTypes; using System.Runtime.Serialization.Formatters.Binary; namespace indigo { public class BingoStorage { object _sync_object = new Object(); BingoIndexData _index_data; const int MAX_BLOCK_SIZE = 8 * 1000 * 1000; class _Block { public int block_index; public byte[] data; public int first_index, end_index; public int[] offsets; public short[] lengths; public List<byte> pending_data; public List<int> pending_offsets; public List<short> pending_lengths; public bool dirty = false; } List<_Block> _blocks = new List<_Block>(); int total_items = -1; public BingoStorage (BingoIndexData index_data) { _index_data = index_data; } public IEnumerable<int> enumerateStorageIds(int? next_after_storate_id) { int i = 0; if (next_after_storate_id.HasValue) i = next_after_storate_id.Value + 1; for (; i < total_items; i++) yield return i; } public void createTables (SqlConnection conn) { BingoSqlUtils.ExecNonQuery(conn, @"CREATE TABLE {0} ( [id] SMALLINT PRIMARY KEY, [first_index] INT, [count] INT, [offsets] VARBINARY(MAX), [lengths] VARBINARY(MAX), [data] VARBINARY(MAX))", _index_data.storageTable); } public void dropTables (SqlConnection conn) { BingoSqlUtils.ExecNonQueryNoThrow(conn, "DROP TABLE {0}", _index_data.storageTable); } private bool _blockLoaded (short block_index) { if (_blocks.Count > block_index) { _Block b = _blocks[block_index]; if (b == null) return false; return true; } return false; } private void _validateBlock (short block_index, SqlConnection conn) { if (_blockLoaded(block_index)) return; lock (_sync_object) { // Double-checked locking if (_blockLoaded(block_index)) return; BingoTimer timer = new BingoTimer("storage.validate_block"); BingoLog.logMessage("Loading storage block {0} for table {1}...", block_index, _index_data.id.InformationName()); string text = "SELECT [data], [first_index], [offsets], [lengths], [count] from " + _index_data.storageTable + " where id = " + block_index; using (SqlCommand command = new SqlCommand(text, conn)) { command.CommandTimeout = 3600; using (SqlDataReader reader = command.ExecuteReader()) { if (!reader.Read()) throw new Exception("Block cannot be found"); while (_blocks.Count <= block_index) _blocks.Add(new _Block()); if (_blocks[block_index] == null) _blocks[block_index] = new _Block(); _Block block = _blocks[block_index]; block.block_index = block_index; block.data = (byte[])reader["data"]; block.first_index = (int)reader["first_index"]; int count = (int)reader["count"]; block.offsets = new int[count]; MemoryStream mem_stream = new MemoryStream((byte[])reader["offsets"]); BinaryReader bin_reader = new BinaryReader(mem_stream); for (int i = 0; i < count; i++) block.offsets[i] = bin_reader.ReadInt32(); block.lengths = new short[count]; mem_stream = new MemoryStream((byte[])reader["lengths"]); bin_reader = new BinaryReader(mem_stream); for (int i = 0; i < count; i++) block.lengths[i] = bin_reader.ReadInt16(); block.end_index = block.first_index + count; } } BingoLog.logMessage(" Done."); timer.end(); } } private void _validateLastBlock (SqlConnection conn) { _validateBlock((short)(_blocks.Count - 1), conn); } public void validate (SqlConnection conn) { if (total_items >= 0) return; // Index already loaded lock (_sync_object) { if (total_items >= 0) return; // Index already loaded int? ret = BingoSqlUtils.ExecIntQuery(conn, "select max(id) from {0}", _index_data.storageTable); if (ret.HasValue) { while (_blocks.Count <= ret.Value) _blocks.Add(null); } int? sum = BingoSqlUtils.ExecIntQuery(conn, "select sum(count) from {0}", _index_data.storageTable); if (!sum.HasValue) total_items = 0; else total_items = sum.Value; } } public bool needFlush() { lock (_sync_object) { foreach (_Block b in _blocks) if (b != null && b.dirty) return true; return false; } } public void flush (SqlConnection conn) { lock (_sync_object) { for (int i = 0; i < _blocks.Count; i++) { _Block b = _blocks[i]; if (b != null) { if (b.dirty) { _flushBlock(conn, b); BingoLog.logMessage("Disposing memory for block {0}...", b.block_index); _blocks[i] = null; BingoLog.logMessage(" Done"); } } } } } private void _flushBlock (SqlConnection conn, _Block b) { BingoLog.logMessage("Flushing storage block {0}", b.block_index); BingoSqlUtils.ExecNonQueryNoThrow(conn, "DELETE FROM {0} WHERE id={1}", _index_data.storageTable, b.block_index); string text = String.Format(@"INSERT INTO {0} ([id], [first_index], [count], [offsets], [lengths], [data]) VALUES ({1}, {2}, {3}, @offsets, @lengths, @data)", _index_data.storageTable, b.block_index, b.first_index, b.end_index - b.first_index); using (SqlCommand command = new SqlCommand(text, conn)) { command.CommandTimeout = 3600; byte[] data; if (b.pending_data != null) data = b.pending_data.ToArray(); else data = b.data; SqlBinary binary_data = new SqlBinary(data); command.Parameters.AddWithValue("@data", binary_data); ICollection<int> offsets; if (b.pending_offsets != null) offsets = b.pending_offsets; else offsets = b.offsets; MemoryStream mem_stream = new MemoryStream(offsets.Count * 4); BinaryWriter writer = new BinaryWriter(mem_stream); foreach (int offset in offsets) writer.Write(offset); byte[] buffer = mem_stream.GetBuffer(); command.Parameters.AddWithValue("@offsets", buffer); ICollection<short> lengths; if (b.pending_lengths != null) lengths = b.pending_lengths; else lengths = b.lengths; mem_stream = new MemoryStream(offsets.Count * 2); writer = new BinaryWriter(mem_stream); foreach (short length in lengths) writer.Write(length); buffer = mem_stream.GetBuffer(); command.Parameters.AddWithValue("@lengths", buffer); command.ExecuteNonQuery(); } _convertPendingBlockToNormal(b); } public int add (byte[] data, SqlConnection conn) { lock (_sync_object) { _Block b = _getPendingBlock(data.Length, conn); // Add one byte for remove mark b.pending_data.Add(1); b.pending_offsets.Add(b.pending_data.Count); if (data.Length > short.MaxValue) throw new Exception("Data length is to long. Unexpected."); b.pending_lengths.Add((short)data.Length); b.pending_data.AddRange(data); total_items++; b.end_index++; b.dirty = true; return total_items - 1; } } private _Block _getPendingBlock (int data_length, SqlConnection conn) { bool need_new_block = false; if (_blocks.Count == 0) need_new_block = true; else { _Block last = _blocks[_blocks.Count - 1]; if (last == null) _validateBlock((short)(_blocks.Count - 1), conn); last = _blocks[_blocks.Count - 1]; ICollection collection = null; if (last.data != null) collection = last.data; else collection = last.pending_data; if (collection.Count + data_length > MAX_BLOCK_SIZE) need_new_block = true; } _Block block = null; if (need_new_block) { // Flush all dirty blocks and dispose memory flush(conn); _blocks.Add(new _Block()); block = _blocks[_blocks.Count - 1]; block.block_index = _blocks.Count - 1; if (_blocks.Count == 1) block.first_index = 0; else block.first_index = total_items; block.end_index = block.first_index; block.dirty = true; } else block = _blocks[_blocks.Count - 1]; _convertBlockToPendingBlock(block); return block; } private void _convertBlockToPendingBlock (_Block block) { if (block.pending_data != null) return; block.pending_data = new List<byte>(); if (block.data != null) block.pending_data.AddRange(block.data); block.data = null; block.pending_offsets = new List<int>(); if (block.offsets != null) block.pending_offsets.AddRange(block.offsets); block.offsets = null; block.pending_lengths = new List<short>(); if (block.lengths != null) block.pending_lengths.AddRange(block.lengths); block.lengths = null; } private void _convertPendingBlockToNormal (_Block block) { if (block.data != null) return; block.data = block.pending_data.ToArray(); block.pending_data = null; block.offsets = block.pending_offsets.ToArray(); block.pending_offsets = null; block.lengths = block.pending_lengths.ToArray(); block.lengths = null; } private _Block _getBlockByIndex (SqlConnection conn, int index, ref int cache_index) { if (cache_index >= _blocks.Count || cache_index < 0) cache_index = 0; _validateBlock((short)cache_index, conn); _Block b = _blocks[cache_index]; if (index < b.first_index || index >= b.end_index) { // Find another block cache_index = -1; do { cache_index++; if (cache_index >= _blocks.Count) throw new Exception( String.Format("index {0} wasn't found in the storage", index)); _validateBlock((short)cache_index, conn); b = _blocks[cache_index]; } while (index < b.first_index || index >= b.end_index); } return b; } // If length if -1 then all rest data is returned private void _getDataWithOffset(int index, int offset, int length, out byte[] dest_data, out int dest_offset, out int dest_len, SqlConnection conn, ref int cache_index) { _Block b = _getBlockByIndex(conn, index, ref cache_index); int sub_offset = index - b.first_index; if (length == -1) { if (b.lengths != null) length = b.lengths[sub_offset] - offset; else length = b.pending_lengths[sub_offset] - offset; } int data_offset; if (b.offsets != null) data_offset = b.offsets[sub_offset]; else data_offset = b.pending_offsets[sub_offset]; if (b.data != null) { dest_data = b.data; dest_len = length; dest_offset = data_offset + offset; } else { dest_data = new byte[length]; b.pending_data.CopyTo(data_offset + offset, dest_data, 0, dest_data.Length); dest_len = length; dest_offset = 0; } } // If length if -1 then all rest data is returned public byte[] get(int index, int offset, int length, SqlConnection conn, ref int cache_index) { byte[] all_data; int data_offset, data_length; _getDataWithOffset(index, offset, length, out all_data, out data_offset, out data_length, conn, ref cache_index); if (data_offset == 0 && data_length == all_data.Length) // Copy isn't necessary return all_data; byte[] data = new byte[data_length]; Array.Copy(all_data, data_offset, data, 0, data.Length); return data; } public int getInt(int index, int offset, SqlConnection conn, ref int cache_index) { byte[] all_data; int data_offset, data_length; _getDataWithOffset(index, offset, 4, out all_data, out data_offset, out data_length, conn, ref cache_index); return BitConverter.ToInt32(all_data, data_offset); } public short getShort(int index, int offset, SqlConnection conn, ref int cache_index) { byte[] all_data; int data_offset, data_length; _getDataWithOffset(index, offset, 2, out all_data, out data_offset, out data_length, conn, ref cache_index); return BitConverter.ToInt16(all_data, data_offset); } public byte getByte (int index, int offset, SqlConnection conn, ref int cache_index) { byte[] all_data; int data_offset, data_length; _getDataWithOffset(index, offset, 1, out all_data, out data_offset, out data_length, conn, ref cache_index); return all_data[data_offset]; } public bool isDeleted (int storage_id, SqlConnection conn, ref int cache_index) { return getByte(storage_id, -1, conn, ref cache_index) == 0; } public void deleteRecord (int storage_id, SqlConnection conn) { lock (_sync_object) { int cache_index = 0; _Block b = _getBlockByIndex(conn, storage_id, ref cache_index); int data_offset; if (b.offsets != null) data_offset = b.offsets[storage_id - b.first_index]; else data_offset = b.pending_offsets[storage_id - b.first_index]; if (b.data != null) b.data[data_offset - 1] = 0; else if (b.pending_data != null) b.pending_data[data_offset - 1] = 0; else throw new Exception("Internal error: cannot delete record"); b.dirty = true; } } } }