Remove duplicate host from data structure, fix port searching bug

This commit is contained in:
Michael Mikovsky
2025-04-17 08:23:43 -06:00
parent 556e6650d0
commit f927719b06
5 changed files with 118 additions and 404 deletions
+9 -311
View File
@@ -1,20 +1,11 @@
use std::{
collections::HashSet,
io::Error,
net::IpAddr,
sync::{Arc, Mutex},
time::Instant,
};
use std::{net::IpAddr, sync::Arc, time::Instant};
use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
use memchr::memmem;
use rocksdb::{Cache, ColumnFamily, DB, IteratorMode, Options, ReadOptions, WriteBatch};
use rocksdb::{Cache, ColumnFamily, DB, IteratorMode, Options, WriteBatch};
use serde::{Deserialize, Serialize};
use crate::port_scan::port_scan::ScanResult;
static COLUMN_COUNT: usize = 5;
static TEST_ROW_COUNT: usize = 1000;
// Global settings for optimal performance
const BLOCK_CACHE_SIZE_MB: usize = 512; // 512MB block cache
@@ -51,14 +42,6 @@ impl StringRow {
}
}
/// Enum for defining search criteria
#[derive(Debug)]
enum SearchCriteria {
ByColumnValue(usize, String), // Search by specific column value
ByColumnPrefix(usize, String), // Search by column value prefix
// ByIdRange(String, String), // Search by ID range
}
impl ResultDatabase {
pub fn new(path: &str) -> Self {
let mut options = Options::default();
@@ -256,7 +239,7 @@ impl ResultDatabase {
if value_str.contains(substring) {
// Convert key to string and add to results
if let Ok(key_str) = std::str::from_utf8(&key_bytes) {
if let Some(row) = decode_row_binary(&value_bytes) {
if let Some(row) = decode_row_binary(key_str, &value_bytes) {
matching_keys.push(row);
}
}
@@ -268,95 +251,6 @@ impl ResultDatabase {
}
}
// Count results from a search without printing
fn search(
db: &DB,
cf_default: &ColumnFamily,
cf_columns: &[&ColumnFamily],
criteria: SearchCriteria,
) -> Result<Vec<StringRow>, Box<dyn std::error::Error>> {
let mut results: Vec<StringRow> = Vec::new();
match criteria {
SearchCriteria::ByColumnValue(col_idx, value) => {
if col_idx >= cf_columns.len() {
return Ok(results);
}
// Create search key with escaped value
let prefix = format!("{}:", fast_escape(&value));
let mut opts = ReadOptions::default();
opts.set_prefix_same_as_start(true);
let iterator = db.iterator_cf_opt(
cf_columns[col_idx],
opts,
rocksdb::IteratorMode::From(prefix.as_bytes(), rocksdb::Direction::Forward),
);
for item in iterator {
let (idx_key, data) = item?;
let idx_key_str = String::from_utf8(idx_key.to_vec())?;
// Skip if we've moved past our prefix
if !idx_key_str.starts_with(&prefix) {
break;
}
let row = decode_row_binary(&data);
if let Some(row) = row {
results.push(row);
}
}
}
SearchCriteria::ByColumnPrefix(col_idx, prefix) => {
if col_idx >= cf_columns.len() {
return Ok(results);
}
// Create search key with escaped prefix
let search_prefix = fast_escape(&prefix);
let iterator = db.iterator_cf(
cf_columns[col_idx],
rocksdb::IteratorMode::From(search_prefix.as_bytes(), rocksdb::Direction::Forward),
);
for item in iterator {
let (idx_key, data) = item?;
let idx_key_str = String::from_utf8(idx_key.to_vec())?;
// Extract just the value part of the index key
let parts: Vec<&str> = idx_key_str.splitn(2, ':').collect();
if parts.len() < 2 {
continue;
}
let value_part = fast_unescape(parts[0]);
// Skip if value doesn't start with our prefix
if !value_part.starts_with(&prefix) {
// If we've moved past potential matches, break early
if value_part > prefix {
break;
}
continue;
}
let row = decode_row_binary(&data);
if let Some(row) = row {
results.push(row);
}
}
}
}
Ok(results)
}
// Fast minimal escaping for key values
#[inline]
fn fast_escape(s: &str) -> String {
@@ -366,62 +260,22 @@ fn fast_escape(s: &str) -> String {
// Fast unescaping for key values
#[inline]
fn fast_unescape(s: &str) -> String {
// Only unescape the colon
s.replace("\\:", ":")
}
// Fast direct row fetch by ID
fn fetch_row(db: &DB, cf_default: &ColumnFamily, row_id: &str) -> Option<StringRow> {
match db.get_cf(cf_default, row_id.as_bytes()) {
Ok(Some(value)) => decode_row_binary(&value),
Ok(Some(value)) => decode_row_binary(row_id, &value),
_ => None,
}
}
// Fast column value fetch
fn fetch_column(db: &DB, cf_default: &ColumnFamily, row_id: &str, column_idx: usize) -> String {
match fetch_row(db, cf_default, row_id) {
Some(row) => get_column_value(&row, column_idx),
None => String::new(),
}
}
// Get a column value, returning empty string if column doesn't exist
#[inline]
fn get_column_value(row: &StringRow, column_index: usize) -> String {
if column_index < row.values.len() {
row.values[column_index].clone()
} else {
String::new() // Return empty string for missing columns
}
}
// Binary decoding of row data
fn decode_row_binary(data: &[u8]) -> Option<StringRow> {
fn decode_row_binary(key: &str, data: &[u8]) -> Option<StringRow> {
if data.len() < 8 {
return None;
}
let mut pos = 0;
// Read ID length
let mut id_len_bytes = [0u8; 4];
id_len_bytes.copy_from_slice(&data[pos..pos + 4]);
let id_len = u32::from_le_bytes(id_len_bytes) as usize;
pos += 4;
// Read ID
if pos + id_len > data.len() {
return None;
}
let id = String::from_utf8_lossy(&data[pos..pos + id_len]).to_string();
pos += id_len;
// Read number of values
if pos + 4 > data.len() {
return None;
}
let mut values_count_bytes = [0u8; 4];
values_count_bytes.copy_from_slice(&data[pos..pos + 4]);
let values_count = u32::from_le_bytes(values_count_bytes) as usize;
@@ -448,16 +302,14 @@ fn decode_row_binary(data: &[u8]) -> Option<StringRow> {
pos += value_len;
}
Some(StringRow { id, values })
Some(StringRow {
id: key.to_string(),
values,
})
}
// Binary encoding of row data for maximum performance
fn encode_row_binary(buf: &mut Vec<u8>, row: &StringRow) {
// Write ID length and ID
let id_bytes = row.id.as_bytes();
buf.extend_from_slice(&(id_bytes.len() as u32).to_le_bytes());
buf.extend_from_slice(id_bytes);
// Write number of values
buf.extend_from_slice(&(row.values.len() as u32).to_le_bytes());
@@ -468,157 +320,3 @@ fn encode_row_binary(buf: &mut Vec<u8>, row: &StringRow) {
buf.extend_from_slice(value_bytes);
}
}
// fn benchmark_create_rows() {
// for i in 0..10000 {
// // Generate 10,000 test rows
// let mut values = Vec::with_capacity(5);
// // Add IP address (column 0)
// if i % 3 == 0 {
// values.push(format!("192.168.1.{}", i % 255));
// } else if i % 3 == 1 {
// values.push(format!("10.0.{}.{}", (i / 255) % 255, i % 255));
// } else {
// values.push(format!("172.16.{}.{}", (i / 255) % 255, i % 255));
// }
// // Add status (column 1)
// if i % 5 < 4 {
// // 80% active
// values.push("active".to_string());
// } else {
// values.push("inactive".to_string());
// }
// // Add response time (column 2) for active servers
// if i % 5 < 4 {
// values.push(format!("{}ms", (i % 100) + 1));
// }
// // Add server name (column 3)
// if i % 2 == 0 {
// values.push(format!("server{:04}", i));
// }
// // Add priority (column 4) for some servers
// if i % 7 == 0 {
// values.push("high_priority".to_string());
// } else if i % 11 == 0 {
// values.push("low_priority".to_string());
// }
// // string_rows.push(StringRow {
// // id: format!("row{:06}", i),
// // values,
// // });
// }
// }
// // Benchmark search performance
// fn benchmark_search<F>(
// db: &DB,
// cf_default: &ColumnFamily,
// cf_columns: &[&ColumnFamily],
// name: &str,
// criteria_fn: F,
// ) -> Result<(), Box<dyn std::error::Error>>
// where
// F: Fn() -> SearchCriteria,
// {
// let mut total_duration = Duration::from_secs(0);
// let mut total_results = 0;
// for i in 1..=3 {
// let criteria = criteria_fn();
// let start = Instant::now();
// let count = count_search_results(db, cf_default, cf_columns, criteria)?;
// let duration = start.elapsed();
// total_duration += duration;
// total_results = count; // All runs should return same count
// println!(" Run {}: Found {} results in {:?}", i, count, duration);
// }
// let avg_duration = total_duration / 3;
// println!(
// " Average: {:?} for {} results",
// avg_duration, total_results
// );
// println!(
// " Speed: {:.2} results/ms",
// total_results as f64 / avg_duration.as_millis() as f64
// );
// Ok(())
// }
// // Benchmark direct row fetch performance
// fn benchmark_direct_fetch(
// db: &DB,
// cf_default: &ColumnFamily,
// name: &str,
// row_id: &str,
// ) -> Result<(), Box<dyn std::error::Error>> {
// let mut total_duration = Duration::from_secs(0);
// for i in 1..=3 {
// let start = Instant::now();
// // Do multiple fetches to get a measurable time
// for _ in 0..1000 {
// let _ = fetch_row(db, cf_default, row_id);
// }
// let duration = start.elapsed();
// total_duration += duration;
// println!(" Run {}: 1000 row fetches in {:?}", i, duration);
// }
// let avg_duration = total_duration / 3;
// println!(" Average: {:?} for 1000 fetches", avg_duration);
// println!(
// " Speed: {:.2} fetches/ms",
// 1000.0 / avg_duration.as_millis() as f64
// );
// Ok(())
// }
// // Benchmark column fetch performance
// fn benchmark_column_fetch(
// db: &DB,
// cf_default: &ColumnFamily,
// name: &str,
// row_id: &str,
// col_idx: usize,
// ) -> Result<(), Box<dyn std::error::Error>> {
// let mut total_duration = Duration::from_secs(0);
// for i in 1..=3 {
// let start = Instant::now();
// // Do multiple fetches to get a measurable time
// for _ in 0..1000 {
// let _ = fetch_column(db, cf_default, row_id, col_idx);
// }
// let duration = start.elapsed();
// total_duration += duration;
// println!(" Run {}: 1000 column fetches in {:?}", i, duration);
// }
// let avg_duration = total_duration / 3;
// println!(" Average: {:?} for 1000 fetches", avg_duration);
// println!(
// " Speed: {:.2} fetches/ms",
// 1000.0 / avg_duration.as_millis() as f64
// );
// Ok(())
// }
//
// // Example usage with batching for very large datasets