bingo/sqlserver/Source/BingoFingerprints.cs (404 lines of code) (raw):

using System.IO; using System; using System.Collections.Generic; using System.Text; using System.Data; using System.Data.SqlClient; using System.Data.SqlTypes; namespace indigo { public class BingoFingerprints { private object _sync_object = new Object(); private BingoIndexData _index_data; private int _fp_bytes, _chunk_bytes; private int _fp_sub_bits_used = 10, _sim_screening_pass_mark = 128, _fp_sim_bits_group = 10; class Block { public Block (bool pending, int fp_bytes) { this.pending = pending; counters = new int[fp_bytes * 8]; } public int part = -1; public int minimum_index = int.MaxValue, maximum_index = int.MinValue; public List<int> indices = new List<int>(); public int[] counters; public List<byte[]> bits = null; // If not null then bits are in memory public bool pending = false; public void validateMinMax () { minimum_index = int.MaxValue; maximum_index = int.MinValue; foreach (int v in indices) validateMinMax(v); } public void validateMinMax (int value) { if (minimum_index > value) minimum_index = value; if (maximum_index < value) maximum_index = value; } }; List<Block> _all_blocks = new List<Block>(); public BingoFingerprints (BingoIndexData index_data) { _index_data = index_data; _fp_bytes = -1; _chunk_bytes = 8000; } public void createTables (SqlConnection conn) { BingoSqlUtils.ExecNonQuery(conn, @"CREATE TABLE {0} ( [part] INTEGER not null, [used] INTEGER not null, [mapping] VARBINARY(MAX), [counters] VARBINARY(MAX))", _index_data.fingerprintsTable); BingoSqlUtils.ExecNonQuery(conn, @"CREATE TABLE {0} ( [part] INTEGER not null, [bit] INTEGER not null, [bits_chunk] BINARY({1}))", _index_data.fingerprintBitsTable, _chunk_bytes); } public void createIndices (SqlConnection conn) { BingoSqlUtils.ExecNonQuery(conn, "ALTER TABLE {0} ADD PRIMARY KEY (part)", _index_data.fingerprintsTable); BingoSqlUtils.ExecNonQuery(conn, "ALTER TABLE {0} ADD PRIMARY KEY (part, bit)", _index_data.fingerprintBitsTable); BingoSqlUtils.ExecNonQuery(conn, "CREATE INDEX part ON {0}(part)", _index_data.fingerprintBitsTable); } public void dropTables (SqlConnection conn) { BingoSqlUtils.ExecNonQueryNoThrow(conn, "DROP TABLE {0}", _index_data.fingerprintsTable); BingoSqlUtils.ExecNonQueryNoThrow(conn, "DROP TABLE {0}", _index_data.fingerprintBitsTable); } public void init (SqlConnection conn) { // Check if blocks have already been loaded if (_all_blocks.Count != 0) return; lock (_sync_object) { // Check again if (_all_blocks.Count != 0) return; int? max_part = BingoSqlUtils.ExecIntQuery(conn, "SELECT MAX(part) from {0}", _index_data.fingerprintsTable); if (!max_part.HasValue) return; using (SqlCommand command = new SqlCommand("SELECT [part], [used], [counters], [mapping] from " + _index_data.fingerprintsTable + " ORDER BY [part]", conn)) { command.CommandTimeout = 3600; using (SqlDataReader reader = command.ExecuteReader()) { while (reader.Read()) { Block new_block = new Block(false, _fp_bytes); new_block.part = Convert.ToInt32(reader[0]); int used = Convert.ToInt32(reader[1]); byte[] counters = (byte[])reader[2]; byte[] mapping = (byte[])reader[3]; // Copy mapping int[] data = new int[used]; Buffer.BlockCopy(mapping, 0, data, 0, mapping.Length); new_block.indices.AddRange(data); new_block.validateMinMax(); // Copy counters Buffer.BlockCopy(counters, 0, new_block.counters, 0, counters.Length); _all_blocks.Add(new_block); } } } } } private void _validateBlockIndices (SqlConnection conn, Block block) { if (block.indices != null) return; lock (_sync_object) { if (block.indices != null) return; BingoLog.logMessage("validating fingerprint {0} block indices", block.part); using (SqlCommand command = new SqlCommand("SELECT [mapping], [used] from " + _index_data.fingerprintsTable + " WHERE [part] = " + block.part, conn)) { command.CommandTimeout = 3600; using (SqlDataReader reader = command.ExecuteReader()) { if (!reader.Read()) throw new Exception("Cannot read fingerprint " + block.part + " block"); byte[] mapping = (byte[])reader[0]; int used = Convert.ToInt32(reader[1]); // Copy mapping int[] data = new int[used]; Buffer.BlockCopy(mapping, 0, data, 0, mapping.Length); block.indices = new List<int>(); block.indices.AddRange(data); block.validateMinMax(); } } } } private void _allocatePendingBlock (SqlConnection conn) { Block block = null; if (_all_blocks.Count == 0) { block = new Block(false, _fp_bytes); _all_blocks.Add(block); } if (block == null) { Block last = _all_blocks[_all_blocks.Count - 1]; _validateBlockIndices(conn, last); if (last.indices.Count == _chunk_bytes * 8) { flush(conn); block = new Block(false, _fp_bytes); _all_blocks.Add(block); } else block = last; } if (block.pending) return; _readBlockBits(ref block, conn); block.pending = true; } private void _readBlockBits (ref Block block, SqlConnection conn) { if (block.bits != null) return; // Bits have already been loaded BingoLog.logMessage("_readBlockBits: allocating block {1} buffer {0} size...", 8 * _fp_bytes * _chunk_bytes, _all_blocks.Count); block.bits = new List<byte[]>(8 * _fp_bytes); for (int i = 0; i < 8 * _fp_bytes; i++) block.bits.Add(new byte[_chunk_bytes]); BingoLog.logMessage(" Done."); string command_text = String.Format("SELECT bit, bits_chunk from {0} where part = {1}", _index_data.fingerprintBitsTable, block.part); using (SqlCommand command = new SqlCommand(command_text, conn)) { command.CommandTimeout = 3600; using (SqlDataReader reader = command.ExecuteReader()) { while (reader.Read()) { int bit = (int)reader[0]; byte[] bit_chunk = (byte[])reader[1]; if (bit_chunk.Length != _chunk_bytes) throw new Exception("Bits block length is incorrect"); bit_chunk.CopyTo(block.bits[bit], 0); } } } } public void addFingerprint (SqlConnection conn, byte[] fp, int id) { lock (_sync_object) { _allocatePendingBlock(conn); Block last = _all_blocks[_all_blocks.Count - 1]; last = _all_blocks[_all_blocks.Count - 1]; _validateBlockIndices(conn, last); int index = last.indices.Count; last.indices.Add(id); last.validateMinMax(id); for (int i = 0; i < _fp_bytes; i++) { byte cur_byte = fp[i]; for (int j = 0; j < 8; j++) { int bit = 8 * i + j; //int offset_chunk = () * _chunk_bytes; int offset_bytes = index / 8; int offset_bit = index % 8; if ((cur_byte & (1 << j)) == 0) { last.bits[bit][offset_bytes] &= (byte)(~(1 << offset_bit)); } else { last.counters[bit]++; last.bits[bit][offset_bytes] |= (byte)(1 << offset_bit); } } } } } public bool needFlush() { lock (_sync_object) { foreach (Block b in _all_blocks) { if (!b.pending || b.indices == null || b.indices.Count == 0) continue; return true; } return false; } } public void flush (SqlConnection conn) { lock (_sync_object) { foreach (Block b in _all_blocks) { if (!b.pending || b.indices == null || b.indices.Count == 0) continue; _flushBlock(b, conn); } } } private void _flushBlock (Block block, SqlConnection conn) { BingoTimer timer = new BingoTimer("fingerprints.flush"); if (block.part == -1) { // Add new block int? max_id = BingoSqlUtils.ExecIntQuery(conn, "SELECT MAX(part) from {0}", _index_data.fingerprintsTable); if (max_id == null) max_id = 0; block.part = max_id.Value + 1; BingoSqlUtils.ExecNonQuery(conn, "INSERT INTO {0} values ({1}, 0, null, null)", _index_data.fingerprintsTable, block.part); } BingoLog.logMessage("Flushing fingerprints block {0}...", block.part); // Update used column and counters column string update_command_text = String.Format(@"UPDATE {0} SET used = @used, counters = @counters, mapping = @mapping where part = {1}", _index_data.fingerprintsTable, block.part); using (SqlCommand command = new SqlCommand(update_command_text, conn)) { command.CommandTimeout = 3600; command.Parameters.AddWithValue("@used", block.indices.Count); byte[] countes_bytes = new byte[8 * _fp_bytes * sizeof(int)]; Buffer.BlockCopy(block.counters, 0, countes_bytes, 0, countes_bytes.Length); SqlBinary countes = new SqlBinary(countes_bytes); command.Parameters.AddWithValue("@counters", countes); byte[] mapping_bytes = new byte[block.indices.Count * sizeof(int)]; Buffer.BlockCopy(block.indices.ToArray(), 0, mapping_bytes, 0, mapping_bytes.Length); SqlBinary mapping = new SqlBinary(mapping_bytes); command.Parameters.AddWithValue("@mapping", mapping); command.ExecuteNonQuery(); } // Update bit chunks BingoSqlUtils.ExecNonQuery(conn, "DELETE FROM {0} WHERE part = {1}", _index_data.fingerprintBitsTable, block.part); string update_bits_text = String.Format(@"INSERT INTO {0} VALUES ({1}, @bit, @bit_chunk)", _index_data.fingerprintBitsTable, block.part); using (SqlCommand command = new SqlCommand(update_bits_text, conn)) { command.CommandTimeout = 3600; command.Parameters.Add("@bit", SqlDbType.Int); command.Parameters.Add("@bit_chunk", SqlDbType.Binary); byte[] chunk = new byte[_chunk_bytes]; for (int i = 0; i < 8 * _fp_bytes; i++) { command.Parameters["@bit"].Value = i; Buffer.BlockCopy(block.bits[i], 0, chunk, 0, chunk.Length); SqlBinary sql_chunk = new SqlBinary(chunk); command.Parameters["@bit_chunk"].Value = sql_chunk; command.ExecuteNonQuery(); } } block.pending = false; block.bits = null; block.indices = null; BingoLog.logMessage(" Done."); timer.end(); } public bool ableToScreen (byte[] fp) { foreach (byte b in fp) if (b != 0) return true; return false; } public IEnumerable<int> screenSub (SqlConnection conn, byte[] fp, int? next_after_storate_id) { screenInBlockDelegate screenBlockSub = new screenInBlockDelegate( (List<int> fp_ones, Block block, byte[] chunk, byte[] chunk2, SqlConnection conn2) => _screenInBlockSub(fp_ones, block, chunk, chunk2, conn2, next_after_storate_id)); return _screen(conn, fp, next_after_storate_id, screenBlockSub); } // Get bounds on the number of bits public delegate void getBoundsDelegate (IList<int> storage_id, ref int[] min_common_ones, ref int[] max_common_ones, SqlConnection conn); public IEnumerable<int> screenSim (SqlConnection conn, byte[] fp, int? next_after_storate_id, getBoundsDelegate boundsDelegate) { screenInBlockDelegate screenBlockSim = new screenInBlockDelegate( (List<int> fp_ones, Block block, byte[] chunk, byte[] chunk2, SqlConnection conn2) => _screenInBlockSim(fp_ones, block, chunk, chunk2, conn2, boundsDelegate, next_after_storate_id)); return _screen(conn, fp, next_after_storate_id, screenBlockSim); } // Delegate for screening in block private delegate List<int> screenInBlockDelegate (List<int> fp_ones, Block block, byte[] chunk, byte[] chunk2, SqlConnection conn); private IEnumerable<int> _screen (SqlConnection conn, byte[] fp, int? next_after_storate_id, screenInBlockDelegate screenInBlockFunc) { // Find ones List<int> fp_ones = new List<int>(); for (int i = 0; i < _fp_bytes; i++) { byte b = fp[i]; for (int j = 0; j < 8; j++) if ((b & (1 << j)) != 0) fp_ones.Add(8 * i + j); } byte[] working_chunk1 = new byte[_chunk_bytes]; byte[] working_chunk2 = new byte[_chunk_bytes]; // Screen foreach (Block block in _all_blocks) { List<int> results = null; _validateBlockIndices(conn, block); results = screenInBlockFunc(fp_ones, block, working_chunk1, working_chunk2, conn); foreach (int index in results) yield return index; } } private void _getBitChunk (Block block, int bit_index, SqlConnection conn, ref byte[] chunk) { BingoTimer timer = new BingoTimer("fingerprints.read"); if (block.bits != null) Buffer.BlockCopy(block.bits[bit_index], 0, chunk, 0, chunk.Length); else { chunk = (byte[])BingoSqlUtils.ExecObjQuery(conn, "SELECT bits_chunk from {0} where part = {1} and bit = {2}", _index_data.fingerprintBitsTable, block.part, bit_index); } timer.end(); } private List<int> _screenInBlockSub (List<int> fp_ones, Block block, byte[] chunk, byte[] chunk2, SqlConnection conn, int? next_after_storate_id) { List<int> results = new List<int>(); if (next_after_storate_id.HasValue && block.maximum_index < next_after_storate_id.Value) return results; int min_storate_id_bound = -1; if (next_after_storate_id.HasValue) min_storate_id_bound = next_after_storate_id.Value; // Sort ones fp_ones.Sort( (i1, i2) => block.counters[i1].CompareTo(block.counters[i2])); if (fp_ones.Count == 0) throw new Exception("Internal error: ableToScreen wasn't checked"); BingoTimer timer = new BingoTimer("fingerprints.screening_sub"); BingoCore.lib.bingoProfIncCounter("fingerprints.bits_total", fp_ones.Count); List<int> fp_ones_used = new List<int>(); for (int i = 0; i < _fp_sub_bits_used; i++) { if (i >= fp_ones.Count) break; fp_ones_used.Add(fp_ones[i]); } int iteration = 0; foreach (BitChunk bit_chunk in bitChunksReaderGrouped(conn, block, fp_ones_used, _fp_sub_bits_used)) { if (iteration == 0) { bit_chunk.chunk.CopyTo(chunk, 0); iteration++; continue; } else bit_chunk.chunk.CopyTo(chunk2, 0); iteration++; bool has_nonzero = false; for (int i = 0; i < chunk.Length; i++) { chunk[i] &= chunk2[i]; if (chunk[i] != 0) has_nonzero = true; } if (!has_nonzero) break; } BingoCore.lib.bingoProfIncCounter("fingerprints.bits_used", iteration); int max_byte_index = (block.indices.Count + 7) / 8; for (int i = 0; i < max_byte_index; i++) { byte b = chunk[i]; if (b == 0) continue; for (int j = 0; j < 8; j++) if ((b & (1 << j)) != 0) { int id = block.indices[8 * i + j]; if (id <= min_storate_id_bound) continue; results.Add(id); } } timer.end(); return results; } private List<int> _screenInBlockSim (List<int> fp_ones, Block block, byte[] chunk, byte[] chunk2, SqlConnection conn, getBoundsDelegate boundsDelegate, int? next_after_storate_id) { List<int> passed_screening = new List<int>(); if (next_after_storate_id.HasValue && block.maximum_index < next_after_storate_id.Value) return passed_screening; int min_storate_id_bound = -1; if (next_after_storate_id.HasValue) min_storate_id_bound = next_after_storate_id.Value; BingoTimer timer = new BingoTimer("fingerprints.screening_sim"); int[] max_common_ones = new int[block.indices.Count]; int[] min_common_ones = new int[block.indices.Count]; int[] one_counters = new int[block.indices.Count]; // Calculate max and min bounds BingoTimer timer2 = new BingoTimer("fingerprints.screening_bounds"); boundsDelegate(block.indices, ref min_common_ones, ref max_common_ones, conn); timer2.end(); List<int> passed_screening_tmp = new List<int>(); BingoCore.lib.bingoProfIncCounter("fingerprints.bits_total", fp_ones.Count); if (fp_ones.Count == 0) { for (int i = 0; i < block.indices.Count; i++) { if (block.indices[i] <= min_storate_id_bound) continue; if (min_common_ones[i] == 0) passed_screening.Add(i); } timer.end(); return passed_screening; } int iteration = 0; foreach (BitChunk bit_chunk in bitChunksReaderGrouped(conn, block, fp_ones, _fp_sim_bits_group)) { chunk = bit_chunk.chunk; BingoTimer timer3 = new BingoTimer("fingerprints.screening_one_counters"); // Calculate ones count int max_byte_index = (block.indices.Count + 7) / 8; for (int i = 0; i < max_byte_index; i++) { byte b = chunk[i]; if (b == 0) continue; for (int j = 0; j < 8; j++) if ((b & (1 << j)) != 0) one_counters[8 * i + j]++; } timer3.end(); BingoTimer timer4 = new BingoTimer("fingerprints.screening_process"); if (iteration == 0) { for (int i = 0; i < block.indices.Count; i++) { if (block.indices[i] <= min_storate_id_bound) continue; int min_possible_ones = one_counters[i]; int max_possible_ones = one_counters[i] + fp_ones.Count; if (min_possible_ones <= max_common_ones[i] && max_possible_ones >= min_common_ones[i]) passed_screening.Add(i); } } else { passed_screening_tmp.Clear(); foreach (int i in passed_screening) { int min_possible_ones = one_counters[i]; int max_possible_ones = one_counters[i] + fp_ones.Count - iteration; if (min_possible_ones <= max_common_ones[i] && max_possible_ones >= min_common_ones[i]) passed_screening_tmp.Add(i); } // Swap then List<int> tmp = passed_screening; passed_screening = passed_screening_tmp; passed_screening_tmp = tmp; } timer4.end(); iteration++; if (passed_screening.Count < _sim_screening_pass_mark) break; } BingoCore.lib.bingoProfIncCounter("fingerprints.bits_used", iteration); for (int i = 0; i < passed_screening.Count; i++) passed_screening[i] = block.indices[passed_screening[i]]; timer.end(); return passed_screening; } public void syncContextParameters (bool is_reaction) { lock (_sync_object) { if (is_reaction) _fp_bytes = BingoCore.getConfigInt("reaction-fp-size-bytes"); else _fp_bytes = BingoCore.getConfigInt("fp-size-bytes"); _fp_sub_bits_used = BingoCore.getConfigInt("SUB_SCREENING_MAX_BITS"); _sim_screening_pass_mark = BingoCore.getConfigInt("SIM_SCREENING_PASS_MARK"); } } class BitChunk { public int bit_index; public byte[] chunk; } IEnumerable<BitChunk> bitChunksReaderGrouped (SqlConnection conn, Block block, List<int> bits, int size) { int offset = 0; while (offset < bits.Count) { int cur_size = size; if (offset + cur_size >= bits.Count) cur_size = bits.Count - offset; StringBuilder indices = new StringBuilder(); for (int i = offset; i < offset + cur_size; i++) { if (indices.Length != 0) indices.Append(", "); indices.AppendFormat("{0}", bits[i]); } string command_text = String.Format("select bit, bits_chunk from {0} where part={1} and bit in ({2})", _index_data.fingerprintBitsTable, block.part, indices.ToString()); using (SqlCommand command = new SqlCommand(command_text, conn)) { command.CommandTimeout = 3600; BingoTimer timer = new BingoTimer("fingerprints.read_grouped_exec"); using (SqlDataReader reader = command.ExecuteReader()) { timer.end(); while (true) { timer = new BingoTimer("fingerprints.read_grouped_read"); bool ret = reader.Read(); if (!ret) break; BitChunk bit_chunk = new BitChunk(); int bit = (int)reader[0]; bit_chunk.bit_index = bit; bit_chunk.chunk = (byte[])reader[1]; timer.end(); yield return bit_chunk; } } } offset += size; } } } }