Add Scan Settings page, Work on saving data

TODO:
- Finish saving data
This commit is contained in:
astatin3
2024-04-19 23:11:46 -06:00
parent 96285a4c58
commit c21af54bcd
7 changed files with 481 additions and 77 deletions
+150 -46
View File
@@ -5,6 +5,10 @@ import importlib
import socket
import struct
import re
import zlib
import base64
import shutil
import resource
from threading import Thread
import nmap
@@ -17,6 +21,11 @@ import libs.scanners.udpScanner as udpScanner
portScanners = []
tasks = []
excludeRanges = []
downIps = 0
upIps = 0
globalSettings = {}
for script in utils.listSubdirs(utils.getRoot("libs/scanners/")):
if not script.endswith(".py"): continue
@@ -31,37 +40,20 @@ for script in utils.listSubdirs(utils.getRoot("libs/scanners/")):
print(f'Imported: {utils.getRoot(f"libs/scanners/{module.__name__}")}')
soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
resource.setrlimit(resource.RLIMIT_NOFILE, (hard_limit, hard_limit))
def scan(host:str, port: int, protocol: str):
error = False
results = ""
for scanner in portScanners:
if str(scanner.__name__) == f'{protocol}{port}.py':
scanResults, error = scanner.scan(host, port)
results += f'[{scanner.__name__}, {host}:{port}] {scanResults}'
if not error:
return results
else:
results += " Trying default scanner... "
if protocol == "tcp":
scanResults, error = tcpScanner.scan(host, port)
results += f'[{tcpScanner.__name__}, {host}:{port}] {scanResults}'
elif protocol == "udp":
scanResults, error = udpScanner.scan(host, port)
results += f'[{udpScanner.__name__}, {host}:{port}] {scanResults}'
else:
raise Exception("This should not happen...")
return results
def start(settings):
global tasks
global globalSettings
globalSettings = settings
if processStarted(): return
print("\n\nStarted Scanner!")
print("\n\n\n", end='')
utils.makeDir("data/scans")
portString = ""
@@ -89,9 +81,11 @@ def start(settings):
case 3:
portString += "T:" + ",".join(map(str, scanutils.portsRelatedTo('tcp', settings['tcpSettings']['relatedString'])))
global excludeRanges
excludeRanges = scanutils.parseIpList(utils.getRoot("exclude.conf"))
for i in range(0,settings['numJobs']):
c = ScanTask(i)
c = ScanTask(i+1)
t = Thread(target = c.run, args=(
settings['maxPingTimeout'],
settings['maxNmapTimeout'],
@@ -102,6 +96,7 @@ def start(settings):
def stop():
if not processStarted(): return
global tasks
for task in tasks:
task.stop()
@@ -114,62 +109,171 @@ def processStarted():
return len(tasks) != 0;
class hostScanDetail:
def __init__(self):
self.address = None
self.hostname = None
# self.
def parseNmapResult(result: object, host: str):
hostname = result.hostname()
resultstr = f'### {host} ({hostname}) {result.keys()}\n'
# dict_keys(['hostnames', 'addresses', 'vendor', 'status', 'tcp', 'portused', 'osmatch'])
# resultstr += f'Location: {scanutils.geolocation(host)}\n'
resultstr = '### Start Host Info ###\n'
resultstr += f'Address: {host}\n'
resultstr += 'Status: Up\n'
resultstr += f'Hostname: {result.hostname()}\n'
resultstr += f'Location: {scanutils.geolocation(host)}\n'
osInfo = []
if 'osmatch' in result:
for os in result['osmatch']:
osInfo.append([os["accuracy"], os["name"]])
resultstr += f'OS-Info: {osInfo}\n'
for protocol in result.all_protocols():
for portInt in result[protocol].keys():
port = result[protocol][portInt]
if port['state'] != 'open':
continue
resultstr += f'Port: {portInt},{protocol},{port["state"]},{port["reason"]}'
resultstr += scan(host, portInt, protocol) + "\n"
if port['state'] == 'open':
data = scan(host, portInt, protocol)
compressedData = base64.b64encode(zlib.compress(data.encode())).decode('ASCII')
resultstr += f',{compressedData}'
print(resultstr)
resultstr += "\n"
resultstr += '### End Host Info ###\n'
print(resultstr, end='')
def addOfflineHost(host:str):
string = '### Start Host Info ###\n' + \
f'Address: {host}\n' + \
f'Status: Down\n' + \
'### End Host Info ###\n'
# print(string, end='')
def scan(host:str, port: int, protocol: str):
error = False
results = ""
for scanner in portScanners:
if str(scanner.__name__) == f'{protocol}{port}.py':
scanResults, error = scanner.scan(host, port)
results += f'[{scanner.__name__}, {host}:{port}] {scanResults}'
if not error:
return results
else:
results += " Trying default scanner... "
if protocol == "tcp":
scanResults, error = tcpScanner.scan(host, port)
results += f'[{tcpScanner.__name__}, {host}:{port}] {scanResults}'
elif protocol == "udp":
scanResults, error = udpScanner.scan(host, port)
results += f'[{udpScanner.__name__}, {host}:{port}] {scanResults}'
else:
raise Exception("This should not happen...")
return results
# def saveData()
def printBar(percentage: float, cols: int):
return ("#" * round(percentage*cols)) + ("-" * round((1-percentage) * cols))
def printIndicator():
return
hostSearchingCount = 0
nmapScanningCount = 0
furtherScanningCount = 0
for task in tasks:
match task.status:
case 1:
hostSearchingCount += 1
case 2:
nmapScanningCount += 1
case 3:
furtherScanningCount += 1
width = shutil.get_terminal_size((80, 20)).columns
global globalSettings
numJobs = int(globalSettings['numJobs'])
print("\033[F\033[F\033[F" +
f"P: {printBar(hostSearchingCount/numJobs,(width-3))}\n" +
f"N: {printBar(nmapScanningCount/numJobs,(width-3))}\n" +
f"S: {printBar(furtherScanningCount/numJobs,(width-3))}\n", end="")
# print(f"1: {hostSearchingCount}, " +
# f"2: {nmapScanningCount}, " +
# f"3: {furtherScanningCount}", end="\r")
class ScanTask:
def __init__(self, threadid: int):
self.threadid = threadid
self.running = True
self.nm = nmap.PortScanner()
self.pingsock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
self.status = None
def stop(self):
self.running = False
self.running = False
def run(self, maxPingTimeout: int, maxNmapTimeout: int, nmapGroupSize: int, portString: str):
global upIps
global downIps
global excludeRanges
while self.running:
self.status = 1
printIndicator()
ipGroup = []
while len(ipGroup) < nmapGroupSize and self.running:
address = socket.inet_ntoa(struct.pack('>I', random.randint(1, 0xffffffff)))
pingCommand = f"ping {address} -c 1 -W {maxPingTimeout}"
if scanutils.ipInArray(address, excludeRanges):
# print(f"Tried {address}")
continue
try:
subprocess.check_output(pingCommand.split(" "))
print(f"{self.threadid} {address}: FOUND {len(ipGroup)+1}/{nmapGroupSize}")
if scanutils.ping(address, maxPingTimeout, self.pingsock):
# print(f"{self.threadid} {address}: FOUND {len(ipGroup)+1}/{nmapGroupSize}")
upIps += 1
ipGroup.append(address)
except subprocess.CalledProcessError:
else:
addOfflineHost(address)
downIps += 1
# print(f"{self.threadid} {address}: FAIL")
continue
print(f'Scanning: {ipGroup}')
if not self.running: return
self.status = 2
printIndicator()
self.nm.scan(hosts=' '.join(ipGroup), ports=portString, arguments="-O --send-eth --privileged -sS --reason -sU")
if not self.running: return
self.status = 3
printIndicator()
for address in self.nm.all_hosts():
parseNmapResult(self.nm[address], address)
# nmapCommand = f"sudo nmap {address} -O --send-eth --privileged -v -sS --reason -sU -p {portString}"
# try:
# parseNmapResult(subprocess.check_output(nmapCommand.split(" ")).decode(), address)
# except subprocess.CalledProcessError:
# continue
+120 -11
View File
@@ -5,16 +5,83 @@ import os
import re
import geoip2.database
def countScannedIps():
files = utils.listSubdirs("data/")
count = 0
for file in files:
if file.split("-")[0] != "scan":
continue
with open('data/'+file) as f:
#Count lines in scan files, Masscan has a 2 line header, so hence -2
count += sum(1 for _ in f)-2
return count
import socket
import struct
import time
def checksum(data):
"""
Calculate the checksum of the ICMP packet data.
"""
sum = 0
for i in range(0, len(data), 2):
sum += (data[i] << 8) + data[i+1]
sum = (sum & 0xffff) + (sum >> 16)
sum = ~sum & 0xffff
return sum
import time
import random
import select
import array
def ping_chksum(packet:bytes):
if len(packet) % 2 != 0:
packet += b'\0'
res = sum(struct.unpack("!%sH" % (len(packet) // 2), packet))
res = (res >> 16) + (res & 0xffff)
res += res >> 16
return (~res) & 0xffff
def ping(host:str, timeout:int, sock):
returnVal = (False, -1)
try:
# Craft the ICMP echo request packet
packet_id = int(time.time() * 1000) & 0xFFFF
header = struct.pack("bbHHh", 8, 0, 0, packet_id, 1)
data = b"ping"
checksum = ping_chksum(header + data)
header = struct.pack("bbHHh", 8, 0, socket.htons(checksum), packet_id, 1)
packet = header + data
# Send the ICMP echo request
sock.sendto(packet, (host, 1))
# Receive the ICMP echo reply
start_time = time.time()
while True:
remaining_time = timeout - (time.time() - start_time)
if remaining_time <= 0:
return False
ready = select.select([sock], [], [], remaining_time)
if ready[0]:
data, addr = sock.recvfrom(1024)
icmp_header = data[20:28]
type, code, checksum, p_id, sequence = struct.unpack("bbHHh", icmp_header)
if p_id == packet_id:
return returVal
except:pass
return returnVal
# return False
# def countScannedIps():
# files = utils.listSubdirs("data/")
# count = 0
# for file in files:
# if file.split("-")[0] != "scan":
# continue
# with open('data/'+file) as f:
# #Count lines in scan files, Masscan has a 2 line header, so hence -2
# count += sum(1 for _ in f)-2
# return count
def getMostCommon(protocol: str, n: int):
nmap_services_file = "/usr/share/nmap/nmap-services"
@@ -125,7 +192,7 @@ if not utils.pathExists(ASN_DB_PATH):
def geolocation(ip_address):
def geolocation(ip_address: str):
try:
# Attempt to retrieve city-level information
with geoip2.database.Reader(CITY_DB_PATH) as reader:
@@ -166,3 +233,45 @@ def geolocation(ip_address):
'ip': ip_address,
'error': 'No geolocation data found'
}
def parseIpList(path: str):
with open(path, "r") as f:
lines = f.readlines()
return [line.rstrip()
for line in lines
if not line.startswith('#') and not line.startswith('\n')
]
def ipToInt(ip: str):
octets = [int(n) for n in ip.split('.')]
return (octets[0] << 24) + (octets[1] << 16) + (octets[2] << 8) + octets[3]
def ipInCIDR(ip: str, ip_CIDR: str):
range_parts = ip_CIDR.split('/')
range_mask = int(range_parts[1])
range_mask_num = (0xFFFFFFFF << (32 - range_mask)) & 0xFFFFFFFF
return (ipToInt(ip) & range_mask_num) == (ipToInt(range_parts[0]) & range_mask_num)
def ipInRange(ip: str, ip_range: str):
ip_int = ipToInt(ip)
start_ip_int, end_ip_int = [ipToInt(ip) for ip in ip_range.split('-')]
return start_ip_int <= ip_int <= end_ip_int
def ipInArray(ip: str, ipRangeArray: list):
for ipRange in ipRangeArray:
if "/" in ipRange:
if ipInCIDR(ip, ipRange):
return True
elif "-" in ipRange:
if ipInRange(ip, ipRange):
return True
elif ip == ipRange:
return True
return False