diff --git a/unit_test.py b/unit_test.py index 3ae1e0f..ede2f4a 100644 --- a/unit_test.py +++ b/unit_test.py @@ -2,7 +2,7 @@ # import sqlite3 import unittest # from database_utils import * -import database_utils +from utils import database_utils class TestFunctions(unittest.TestCase): @@ -10,7 +10,8 @@ class TestFunctions(unittest.TestCase): def setUp(self): self.verbose = False self.database_name = 'test_database.db' - self.database = database_utils.connectDatabase(self.database_name, self.verbose) + self.database = database_utils.connectDatabase(self.database_name, + self.verbose) database_utils.createDatabase(self.database, self.verbose) database_utils.createViews(self.database, self.verbose) self.c = self.database.cursor() @@ -38,14 +39,16 @@ def test_insertAP(self): mfpc = 'False' mfpr = 'False' # Insert new AP - result = database_utils.insertAP(self.c, self.verbose, self.bssid, essid, - manuf, channel, freqmhz, carrier, encryption, - packets_total, lat, lon, cloaked, mfpc, mfpr, 0) + result = database_utils.insertAP(self.c, self.verbose, self.bssid, + essid, manuf, channel, freqmhz, + carrier, encryption, packets_total, + lat, lon, cloaked, mfpc, mfpr, 0) self.assertEqual(result, 0) # TODO Insert existing AP with new values # manuf = "Updated_Manufacturer" - # result = insertAP(self.c, False, bssid, essid, manuf, channel, freqmhz, carrier, + # result = insertAP(self.c, False, bssid, essid, manuf, channel, + # freqmhz, carrier, # encryption, packets_total, lat, lon, cloaked) # self.assertEqual(result, 0) # self.c.execute("SELECT * FROM AP WHERE bssid=?", (bssid,)) @@ -59,12 +62,15 @@ def test_insertClients(self): packets_total = "10" power = "-70" # Insert new client - result = database_utils.insertClients(self.c, self.verbose, self.mac, ssid, manuf, packets_total, power, "Misc", 0) + result = database_utils.insertClients(self.c, self.verbose, self.mac, + ssid, manuf, packets_total, + power, "Misc", 0) self.assertEqual(result, 0) # TODO Insert existing client with new values # manuf = "Updated_Manufacturer" - # result = insertClients(self.c, False, mac, ssid, manuf, packets_total, power, "Misc") + # result = insertClients(self.c, False, mac, ssid, manuf, + # packets_total, power, "Misc") # self.assertEqual(result, 0) # self.c.execute("SELECT * FROM CLIENT WHERE mac=?", (mac,)) # rows = self.c.fetchall() @@ -82,15 +88,20 @@ def test_insertWPS(self): wps_config_methods_keypad = True # Insert new WPS - result = database_utils.insertWPS(self.c, self.verbose, self.bssid, wlan_ssid, - wps_version, wps_device_name, wps_model_name, - wps_model_number, wps_config_methods, wps_config_methods_keypad) + result = database_utils.insertWPS(self.c, self.verbose, self.bssid, + wlan_ssid, wps_version, + wps_device_name, wps_model_name, + wps_model_number, + wps_config_methods, + wps_config_methods_keypad) self.assertEqual(result, 0) # TODO Insert existing WPS with new values # wps_device_name = "Updated_Device" - # result = insertWPS(self.c, self.verbose, bssid, wlan_ssid, wps_version, - # wps_device_name, wps_model_name, wps_model_number, wps_config_methods, wps_config_methods_keypad) + # result = insertWPS(self.c, self.verbose, bssid, wlan_ssid, + # wps_version, + # wps_device_name, wps_model_name, wps_model_number, + # wps_config_methods, wps_config_methods_keypad) # self.assertEqual(result, 0) # self.c.execute("SELECT * FROM WPS WHERE wlan_ssid=?", (wlan_ssid,)) # rows = self.c.fetchall() @@ -112,9 +123,10 @@ def test_insertConnected(self): mfpc = 'False' mfpr = 'False' # Insert new AP - result = database_utils.insertAP(self.c, self.verbose, self.bssid, essid, - manuf, channel, freqmhz, carrier, encryption, - packets_total, lat, lon, cloaked, mfpc, mfpr, 0) + result = database_utils.insertAP(self.c, self.verbose, self.bssid, + essid, manuf, channel, freqmhz, + carrier, encryption, packets_total, + lat, lon, cloaked, mfpc, mfpr, 0) self.assertEqual(result, 0) @@ -123,24 +135,30 @@ def test_insertConnected(self): packets_total = "10" power = "-70" # Insert new client - result = database_utils.insertClients(self.c, self.verbose, self.mac, ssid, manuf, packets_total, power, "Misc", 0) + result = database_utils.insertClients(self.c, self.verbose, self.mac, + ssid, manuf, packets_total, + power, "Misc", 0) self.assertEqual(result, 0) # Insert new connected device - result = database_utils.insertConnected(self.c, self.verbose, self.bssid, self.mac) + result = database_utils.insertConnected(self.c, self.verbose, + self.bssid, self.mac) self.assertEqual(result, 0) def test_inserFile(self): - path = "file://path" + script_path = os.path.dirname(os.path.abspath(__file__)) + path = script_path+"/README.md" result = database_utils.insertFile(self.c, self.verbose, path) self.assertEqual(result, 0) def test_insertHandshake(self): - path = "file://path" + script_path = os.path.dirname(os.path.abspath(__file__)) + path = script_path+"/README.md" - result = database_utils.insertHandshake(self.c, self.verbose, self.bssid, self.mac, path) + result = database_utils.insertHandshake(self.c, self.verbose, + self.bssid, self.mac, path) self.assertEqual(result, 0) self.c.execute("SELECT * FROM handshake WHERE bssid=?", (self.bssid,)) @@ -151,9 +169,12 @@ def test_insertHandshake(self): def test_insertIdentity(self): identity = "DOMAIN\\username" method = "EAP-PEAP" - result = database_utils.insertIdentity(self.c, self.verbose, self.bssid, self.mac, identity, method) + result = database_utils.insertIdentity(self.c, self.verbose, + self.bssid, self.mac, identity, + method) self.assertEqual(result, 0) - self.c.execute("SELECT identity FROM Identity WHERE mac=?", (self.mac,)) + self.c.execute("SELECT identity FROM Identity WHERE mac=?", + (self.mac,)) row = self.c.fetchone() self.assertEqual(row[0], identity) @@ -164,8 +185,9 @@ def test_insertSeenClient(self): packets_total = "10" power = "-70" # Insert new client - result = database_utils.insertClients(self.c, self.verbose, self.mac, ssid, manuf, packets_total, power, "Misc", 0) - + result = database_utils.insertClients(self.c, self.verbose, self.mac, + ssid, manuf, packets_total, + power, "Misc", 0) # Insert seenClient # station = "Test_Station" @@ -175,7 +197,9 @@ def test_insertSeenClient(self): lat = "37.7749" lon = "-122.4194" alt = "10000" - result = database_utils.insertSeenClient(self.c, self.verbose, self.mac, time, tool, power, lat, lon, alt) + result = database_utils.insertSeenClient(self.c, self.verbose, + self.mac, time, tool, power, + lat, lon, alt) self.assertEqual(result, 0) self.c.execute("SELECT * FROM SeenClient WHERE mac=?", (self.mac,)) row = self.c.fetchone() @@ -198,8 +222,9 @@ def test_insertSeenAP(self): mfpc = 'False' mfpr = 'False' # Insert new AP - result = database_utils.insertAP(self.c, self.verbose, self.bssid, essid, manuf, - channel, freqmhz, carrier, encryption, packets_total, + result = database_utils.insertAP(self.c, self.verbose, self.bssid, + essid, manuf, channel, freqmhz, + carrier, encryption, packets_total, lat, lon, cloaked, mfpc, mfpr, 0) self.assertEqual(result, 0) @@ -213,7 +238,8 @@ def test_insertSeenAP(self): alt = "10000" bsstimestamp = "2032-02-23 10:00:00" result = database_utils.insertSeenAP(self.c, self.verbose, self.bssid, - time, tool, signal_rsi, lat, lon, alt, bsstimestamp) + time, tool, signal_rsi, lat, lon, + alt, bsstimestamp) self.assertEqual(result, 0) self.c.execute("SELECT * FROM SeenAP WHERE bssid=?", (self.bssid,)) row = self.c.fetchone() @@ -236,8 +262,9 @@ def test_setHashcat(self): mfpr = 'False' # Insert new AP result = database_utils.insertAP(self.c, self.verbose, self.bssid, - essid, manuf, channel, freqmhz, carrier, - encryption, packets_total, lat, lon, cloaked, mfpc, mfpr, 0) + essid, manuf, channel, freqmhz, + carrier, encryption, packets_total, + lat, lon, cloaked, mfpc, mfpr, 0) self.assertEqual(result, 0) @@ -246,20 +273,26 @@ def test_setHashcat(self): packets_total = "10" power = "-70" # Insert new client - result = database_utils.insertClients(self.c, self.verbose, self.mac, ssid, manuf, packets_total, power, "Misc", 0) + result = database_utils.insertClients(self.c, self.verbose, self.mac, + ssid, manuf, packets_total, + power, "Misc", 0) self.assertEqual(result, 0) # insert Handshake - path = "file://path" + script_path = os.path.dirname(os.path.abspath(__file__)) + path = script_path+"/README.md" - result = database_utils.insertHandshake(self.c, self.verbose, self.bssid, self.mac, path) + result = database_utils.insertHandshake(self.c, self.verbose, + self.bssid, self.mac, path) self.assertEqual(result, 0) # Insert hashcat HASH - path = "file://path" + script_path = os.path.dirname(os.path.abspath(__file__)) + path = script_path+"/README.md" test_hashcat = "aa:bb:cc:dd:ee:ff:11:22:33:44:55:66:77" - result = database_utils.setHashcat(self.c, self.verbose, self.bssid, self.mac, path, test_hashcat) + result = database_utils.setHashcat(self.c, self.verbose, self.bssid, + self.mac, path, test_hashcat) self.assertEqual(result, 0) self.c.execute("SELECT * FROM handshake WHERE bssid=?", (self.bssid,)) rows = self.c.fetchall() @@ -280,9 +313,10 @@ def test_obfuscateDB(self): mfpc = 'False' mfpr = 'False' # Insert new AP - result = database_utils.insertAP(self.c, self.verbose, self.bssid, essid, - manufAP, channel, freqmhz, carrier, encryption, - packets_total, lat, lon, cloaked, mfpc, mfpr, 0) + result = database_utils.insertAP(self.c, self.verbose, self.bssid, + essid, manufAP, channel, freqmhz, + carrier, encryption, packets_total, + lat, lon, cloaked, mfpc, mfpr, 0) self.assertEqual(result, 0) @@ -291,22 +325,26 @@ def test_obfuscateDB(self): packets_total = "10" power = "-70" # Insert new client - result = database_utils.insertClients(self.c, self.verbose, self.mac, ssid, - manufClient, packets_total, power, "Misc", 0) + result = database_utils.insertClients(self.c, self.verbose, self.mac, + ssid, manufClient, packets_total, + power, "Misc", 0) self.assertEqual(result, 0) # insert Handshake - path = "file://path" + script_path = os.path.dirname(os.path.abspath(__file__)) + path = script_path+"/README.md" - result = database_utils.insertHandshake(self.c, self.verbose, self.bssid, self.mac, path) + result = database_utils.insertHandshake(self.c, self.verbose, + self.bssid, self.mac, path) self.assertEqual(result, 0) # obfuscateDB result = database_utils.obfuscateDB(self.database, self.verbose) self.assertEqual(result, 0) - # self.c.execute("SELECT * FROM handshake WHERE bssid=?", (self.bssid,)) + # self.c.execute("SELECT * FROM handshake WHERE bssid=?", + # (self.bssid,)) self.c.execute("SELECT * FROM AP WHERE ssid=?", (essid,)) rows = self.c.fetchall() # Same ESSID but different BSSID diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/database_utils.py b/utils/database_utils.py similarity index 85% rename from database_utils.py rename to utils/database_utils.py index 7766766..b9a21ee 100644 --- a/database_utils.py +++ b/utils/database_utils.py @@ -59,12 +59,15 @@ def createViews(database, verbose): def insertAP(cursor, verbose, bssid, essid, manuf, channel, freqmhz, carrier, - encryption, packets_total, lat, lon, cloaked, mfpc, mfpr, firstTimeSeen): + encryption, packets_total, lat, lon, cloaked, mfpc, mfpr, + firstTimeSeen): '''''' try: - cursor.execute('''INSERT INTO AP VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?) ''', - (bssid.upper(), essid, cloaked, manuf, channel, freqmhz, carrier, - encryption, packets_total, lat, lon, mfpc, mfpr, firstTimeSeen)) + cursor.execute('''INSERT INTO AP VALUES + (?,?,?,?,?,?,?,?,?,?,?,?,?,?)''', + (bssid.upper(), essid, cloaked, manuf, channel, freqmhz, + carrier, encryption, packets_total, lat, lon, mfpc, + mfpr, firstTimeSeen)) return int(0) except sqlite3.IntegrityError as error: @@ -76,12 +79,15 @@ def insertAP(cursor, verbose, bssid, essid, manuf, channel, freqmhz, carrier, # If firstTimeSeen is before current firstTimeSeen update # Update `firstTimeSeen` column if firstTimeSeen != 0: - sql = """UPDATE AP SET firstTimeSeen = CASE WHEN (firstTimeSeen = '' OR firstTimeSeen = '0' OR - firstTimeSeen IS NULL OR firstTimeSeen > (?)) AND (?) <> 0 AND firstTimeSeen <> 0 THEN (?) ELSE firstTimeSeen END WHERE bssid = (?)""" + sql = """UPDATE AP SET firstTimeSeen = CASE WHEN + firstTimeSeen = '' OR firstTimeSeen = '0' OR + firstTimeSeen IS NULL OR firstTimeSeen > (?)) AND + (?) <> 0 AND firstTimeSeen <> 0 THEN (?) ELSE + firstTimeSeen END WHERE bssid = (?)""" if verbose: print(sql, (firstTimeSeen, bssid)) - cursor.execute(sql, (firstTimeSeen, firstTimeSeen, firstTimeSeen, bssid.upper())) - + cursor.execute(sql, (firstTimeSeen, firstTimeSeen, + firstTimeSeen, bssid.upper())) # Write if empty sql = """UPDATE AP SET ssid = CASE WHEN ssid = '' OR @@ -91,36 +97,40 @@ def insertAP(cursor, verbose, bssid, essid, manuf, channel, freqmhz, carrier, cursor.execute(sql, (essid, bssid.upper())) # Update `manuf` column - sql = """UPDATE AP SET manuf = CASE WHEN manuf = '' OR manuf IS NULL - THEN (?) ELSE manuf END WHERE bssid = (?)""" + sql = """UPDATE AP SET manuf = CASE WHEN manuf = '' OR manuf IS + NULL THEN (?) ELSE manuf END WHERE bssid = (?)""" if verbose: print(sql, (manuf, bssid.upper())) cursor.execute(sql, (manuf, bssid.upper())) # Update `channel` column - sql = """UPDATE AP SET channel = CASE WHEN channel = '' OR channel IS NULL - OR channel = 0 THEN (?) ELSE channel END WHERE bssid = (?)""" + sql = """UPDATE AP SET channel = CASE WHEN channel = '' OR channel + IS NULL OR channel = 0 THEN (?) ELSE channel END + WHERE bssid = (?)""" if verbose: print(sql, (channel, bssid)) cursor.execute(sql, (channel, bssid.upper())) # Update `frequency` column sql = """UPDATE AP SET frequency = CASE WHEN frequency = '' OR - frequency IS NULL OR frequency < 2000 THEN (?) ELSE frequency END WHERE bssid = (?)""" + frequency IS NULL OR frequency < 2000 THEN (?) ELSE + frequency END WHERE bssid = (?)""" if verbose: print(sql, (freqmhz, bssid)) cursor.execute(sql, (freqmhz, bssid.upper())) # Update `carrier` column - sql = """UPDATE AP SET carrier = CASE WHEN carrier = '' OR carrier IS NULL - THEN (?) ELSE carrier END WHERE bssid = (?)""" + sql = """UPDATE AP SET carrier = CASE WHEN carrier = '' OR + carrier IS NULL + THEN (?) ELSE carrier END WHERE bssid = (?)""" if verbose: print(sql, (carrier, bssid)) cursor.execute(sql, (carrier, bssid.upper())) # Update `encryption` column sql = """UPDATE AP SET encryption = CASE WHEN encryption = '' OR - encryption IS NULL THEN (?) ELSE encryption END WHERE bssid = (?)""" + encryption IS NULL THEN (?) ELSE encryption END + WHERE bssid = (?)""" if verbose: print(sql, (encryption, bssid)) cursor.execute(sql, (encryption, bssid.upper())) @@ -141,8 +151,8 @@ def insertAP(cursor, verbose, bssid, essid, manuf, channel, freqmhz, carrier, cursor.execute(sql, (lat, lon, bssid.upper())) # Update `cloaked` column - sql = """UPDATE AP SET cloaked = CASE WHEN cloaked = 'False' THEN (?) - ELSE cloaked END WHERE bssid = (?)""" + sql = """UPDATE AP SET cloaked = CASE WHEN cloaked = 'False' + THEN (?) ELSE cloaked END WHERE bssid = (?)""" if verbose: print(sql, (cloaked, bssid.upper())) cursor.execute(sql, (cloaked, bssid.upper())) @@ -178,7 +188,8 @@ def insertClients(cursor, verbose, mac, ssid, manuf, '''Function to insert clients in the database''' try: cursor.execute('''INSERT INTO client VALUES(?,?,?,?,?,?,?)''', - (mac.upper(), ssid, manuf, type, packets_total, device, firstTimeSeen)) + (mac.upper(), ssid, manuf, type, packets_total, device, + firstTimeSeen)) return int(0) except sqlite3.IntegrityError as error: # errors += 1 @@ -188,47 +199,49 @@ def insertClients(cursor, verbose, mac, ssid, manuf, # If firstTimeSeen is before current firstTimeSeen update # Update `firstTimeSeen` column - if firstTimeSeen != 0: - sql = """UPDATE client SET firstTimeSeen = CASE WHEN (firstTimeSeen = '' OR firstTimeSeen = '0' OR - firstTimeSeen IS NULL OR firstTimeSeen > (?)) AND (?) <> 0 AND firstTimeSeen <> 0 THEN (?) ELSE firstTimeSeen END WHERE mac = (?)""" + if firstTimeSeen != 0: + sql = """UPDATE client SET firstTimeSeen = CASE WHEN + (firstTimeSeen = '' OR firstTimeSeen = '0' OR + firstTimeSeen IS NULL OR firstTimeSeen > (?)) AND + (?) <> 0 AND firstTimeSeen <> 0 THEN (?) ELSE + firstTimeSeen END WHERE mac = (?)""" if verbose: print(sql, (firstTimeSeen, mac)) - cursor.execute(sql, (firstTimeSeen, firstTimeSeen, firstTimeSeen, mac.upper())) - + cursor.execute(sql, (firstTimeSeen, firstTimeSeen, + firstTimeSeen, mac.upper())) # Update `packetsTotal` column sql = """UPDATE client SET packetsTotal = packetsTotal + (?) - WHERE mac = (?)""" + WHERE mac = (?)""" if verbose: print(sql, (packets_total, mac.upper())) cursor.execute(sql, (packets_total, mac.upper())) # Write if empty # Update `ssid` column - sql = """UPDATE client SET ssid = CASE WHEN ssid = '' OR ssid IS NULL - THEN (?) ELSE ssid END WHERE mac = (?)""" + sql = """UPDATE client SET ssid = CASE WHEN ssid = '' OR ssid IS + NULL THEN (?) ELSE ssid END WHERE mac = (?)""" if verbose: print(sql, (ssid, mac.upper())) cursor.execute(sql, (ssid, mac.upper())) # Update `manuf` column - sql = """UPDATE client SET manuf = CASE WHEN manuf = '' OR manuf IS NULL - THEN (?) ELSE manuf END WHERE mac = (?)""" + sql = """UPDATE client SET manuf = CASE WHEN manuf = '' OR manuf IS + NULL THEN (?) ELSE manuf END WHERE mac = (?)""" if verbose: print(sql, (manuf, mac.upper())) cursor.execute(sql, (manuf, mac.upper())) - # Update `type` column - sql = """UPDATE client SET type = CASE WHEN type = '' OR type IS NULL - THEN (?) ELSE type END WHERE mac = (?)""" + sql = """UPDATE client SET type = CASE WHEN type = '' OR type IS + NULL THEN (?) ELSE type END WHERE mac = (?)""" if verbose: print(sql, (type, mac.upper())) cursor.execute(sql, (type, mac.upper())) # Update `manuf` column - sql = """UPDATE client SET device = CASE WHEN device = '' OR device IS NULL - THEN (?) ELSE device END WHERE mac = (?)""" + sql = """UPDATE client SET device = CASE WHEN device = '' OR + device IS NULL THEN (?) ELSE device END WHERE mac = (?)""" if verbose: print(sql, (device, mac.upper())) cursor.execute(sql, (device, mac.upper())) @@ -280,9 +293,9 @@ def insertWPS(cursor, verbose, bssid, wlan_ssid, wps_version, wps_device_name, cloaked = 'False' mfpc = 'False' mfpr = 'False' - insertAP(cursor, verbose, bssid, essid, manuf, channel, freqmhz, carrier, - encryption, packets_total, lat, lon, cloaked, mfpc, mfpr, 0) - + insertAP(cursor, verbose, bssid, essid, manuf, channel, freqmhz, + carrier, encryption, packets_total, lat, lon, cloaked, mfpc, + mfpr, 0) cursor.execute('''INSERT INTO WPS VALUES(?,?,?,?,?,?,?,?)''', (bssid.upper(), wlan_ssid, wps_version, wps_device_name, @@ -334,9 +347,9 @@ def insertMFP(cursor, verbose, bssid, mfpc, mfpr, file): lat = "0.0" lon = "0.0" cloaked = 'False' - insertAP(cursor, verbose, bssid, essid, manuf, channel, freqmhz, carrier, - encryption, packets_total, lat, lon, cloaked, mfpc, mfpr, 0) - + insertAP(cursor, verbose, bssid, essid, manuf, channel, freqmhz, + carrier, encryption, packets_total, lat, lon, cloaked, mfpc, + mfpr, 0) return int(0) except sqlite3.IntegrityError as error: @@ -359,7 +372,8 @@ def insertHandshake(cursor, verbose, bssid, mac, file): error += insertFile(cursor, verbose, file) # Get file hash MD5 - hash = hashlib.md5(open(file,'rb').read()).hexdigest() + with open(file, 'rb') as file_handle: + hash = hashlib.md5(file_handle.read()).hexdigest() # insertHandshake Client and AP CONSTRAINT ssid = "" @@ -368,7 +382,7 @@ def insertHandshake(cursor, verbose, bssid, mac, file): packets_total = "0" device = "" error += insertClients(cursor, verbose, mac, ssid, manuf, - type, packets_total, device, 0) + type, packets_total, device, 0) essid = "" manuf = "" channel = "" @@ -381,8 +395,9 @@ def insertHandshake(cursor, verbose, bssid, mac, file): cloaked = 'False' mfpc = 'False' mfpr = 'False' - error += insertAP(cursor, verbose, bssid, essid, manuf, channel, freqmhz, carrier, - encryption, packets_total, lat, lon, cloaked, mfpc, mfpr, 0) + error += insertAP(cursor, verbose, bssid, essid, manuf, channel, + freqmhz, carrier, encryption, packets_total, lat, + lon, cloaked, mfpc, mfpr, 0) # print(row[5].replace(' ', '')) cursor.execute( @@ -413,7 +428,6 @@ def insertIdentity(cursor, verbose, bssid, mac, identity, method): error += insertClients(cursor, verbose, mac, ssid, manuf, "", packets_total, device, 0) - essid = "" manuf = "" channel = "" @@ -426,9 +440,9 @@ def insertIdentity(cursor, verbose, bssid, mac, identity, method): cloaked = 'False' mfpc = 'False' mfpr = 'False' - error += insertAP(cursor, verbose, bssid, essid, manuf, channel, freqmhz, carrier, - encryption, packets_total, lat, lon, cloaked, mfpc, mfpr, 0) - + error += insertAP(cursor, verbose, bssid, essid, manuf, channel, + freqmhz, carrier, encryption, packets_total, lat, + lon, cloaked, mfpc, mfpr, 0) if verbose: print('output ' + bssid.upper(), mac.upper(), identity, method) @@ -488,10 +502,12 @@ def insertSeenAP(cursor, verbose, bssid, time, tool, signal_rsi, def setHashcat(cursor, verbose, bssid, mac, file, hashcat): try: - hashMD5 = hashlib.md5(open(file,'rb').read()).hexdigest() + with open(file, 'rb') as file_handle: + hashMD5 = hashlib.md5(file_handle.read()).hexdigest() if verbose: print("HASH: ", hash) - cursor.execute('''INSERT OR REPLACE INTO Handshake VALUES(?,?,?,?,?)''', + cursor.execute('''INSERT OR REPLACE INTO Handshake + VALUES(?,?,?,?,?)''', (bssid.upper(), mac.upper(), file, hashMD5, hashcat)) return int(0) except sqlite3.IntegrityError as error: @@ -502,7 +518,8 @@ def setHashcat(cursor, verbose, bssid, mac, file, hashcat): def insertFile(cursor, verbose, file): try: # Get MD5 - hash = hashlib.md5(open(file,'rb').read()).hexdigest() + with open(file, 'rb') as file_handle: + hash = hashlib.md5(file_handle.read()).hexdigest() if verbose: print("HASH: ", hash) cursor.execute('''INSERT OR REPLACE INTO Files VALUES(?,?,?,?)''', @@ -528,10 +545,13 @@ def checkFileProcessed(cursor, verbose, file): if verbose: print("File", file, "does not exist") return int(0) - - hash = hashlib.md5(open(file,'rb').read()).hexdigest() + + with open(file, 'rb') as file_handle: + hash = hashlib.md5(file_handle.read()).hexdigest() + try: - sql = "SELECT file from Files where hashMD5 = '" + hash + "' AND processed = 'True';" + sql = "SELECT file from Files where hashMD5 = '" + hash + \ + "' AND processed = 'True';" cursor.execute(sql) output = cursor.fetchall() @@ -544,7 +564,8 @@ def checkFileProcessed(cursor, verbose, file): return int(2) -# obfuscated the database AA:BB:CC:XX:XX:XX-DEFG, needs database and not cursos to commit +# obfuscated the database AA:BB:CC:XX:XX:XX-DEFG, +# needs database and not cursos to commit def obfuscateDB(database, verbose): # APs! try: diff --git a/oui.py b/utils/oui.py similarity index 97% rename from oui.py rename to utils/oui.py index 0e75ba9..5326cbe 100644 --- a/oui.py +++ b/utils/oui.py @@ -27,8 +27,8 @@ def load_vendors(): if current_time - modification_time < 2 * 60 * 60: print("File was download within the last 2 hours - SKIP") redownload = False - - if redownload: # download again if >2h or file dont exists + + if redownload: # download again if >2h or file dont exists with tempfile.NamedTemporaryFile(delete=True) as tmp: print(tmp.name) try: @@ -59,7 +59,7 @@ def load_vendors(): # catastrophic error. bail. print(e) tmp.close() - #os.unlink(tmp.name) + # os.unlink(tmp.name) with open(fileCSV, encoding='cp850') as csv_file: csv_reader = csv.reader(csv_file, delimiter=',') diff --git a/utils/update.py b/utils/update.py new file mode 100644 index 0000000..4942662 --- /dev/null +++ b/utils/update.py @@ -0,0 +1,76 @@ +import os +import sys +import subprocess +import requests +import re + + +def is_git_installed(): + try: + subprocess.run(["git", "--version"], stdout=subprocess.PIPE, + stderr=subprocess.PIPE, check=True) + return True + except FileNotFoundError: + return False + except subprocess.CalledProcessError: + return False + + +def get_latest_github_release(repo_url): + try: + response = requests.get(f"{repo_url}/releases/latest") + if response.status_code == 200: + latest_release_tag = response.json()["tag_name"] + return latest_release_tag + else: + return None + except Exception as e: + print(e) + return None + + +def check_for_update(VERSION): + repo_url = "https://api.github.com/repos/r4ulcl/wifi_db" + script_path = os.path.abspath(__file__) + script_dir = os.path.dirname(script_path) + + if not is_git_installed(): + print("Git is not installed on your system. Please install Git.") + sys.exit(1) + + latest_release_tag = get_latest_github_release(repo_url) + + if latest_release_tag: + # Get only the number part, without v and -dev + latest_release_tag_number = re.search(r'(\d+(\.\d+)+)', + latest_release_tag).group(1) + current_number = re.search(r'(\d+(\.\d+)+)', VERSION).group(1) + # print(latest_release_tag_number) + # print(current_number) + if latest_release_tag_number > current_number: + user_choice = input("A new version is available (v" + + latest_release_tag_number + + "). Do you want to update (Y/n)?: " + ).strip().lower() or "y" + if user_choice in ("", "y", "Y"): + print("Updating...") + update_process = subprocess.Popen(["git", "pull"], + cwd=script_dir) + # Wait for the Git pull operation to complete + update_process.wait() + print("Update complete. Please run again the script.") + sys.exit() + else: + print("You chose not to update. Running the current version.") + elif latest_release_tag_number < current_number: + print("You are using a future version ;) ("+VERSION+").\n") + else: + print("You are using the latest version ("+VERSION+").\n") + else: + print("Unable to check for updates.") + + +if __name__ == "__main__": + VERSION = 'v1.2' + + check_for_update(VERSION) diff --git a/view.sql b/utils/view.sql similarity index 100% rename from view.sql rename to utils/view.sql diff --git a/wifi_db_aircrack.py b/utils/wifi_db_aircrack.py similarity index 92% rename from wifi_db_aircrack.py rename to utils/wifi_db_aircrack.py index 50753ac..53408a5 100644 --- a/wifi_db_aircrack.py +++ b/utils/wifi_db_aircrack.py @@ -5,9 +5,9 @@ import xml.etree.ElementTree as ET import os import re -import oui +from utils import oui import ftfy -import database_utils +from utils import database_utils import pyshark import subprocess # import platform @@ -89,10 +89,14 @@ def parse_netxml(ouiMap, name, database, verbose): carrier = wireless.find("carrier").text # firstTimeSeen - firstTimeSeen_string = wireless.find("SSID").attrib['first-time'] - date_object = datetime.datetime.strptime(firstTimeSeen_string, "%a %b %d %H:%M:%S %Y") + firstTimeSeen_string = wireless.find( + "SSID" + ).attrib['first-time'] + date_object = datetime.datetime.strptime( + firstTimeSeen_string, "%a %b %d %H:%M:%S %Y" + ) firstTimeSeen = date_object.strftime("%Y-%m-%d %H:%M:%S") - + manuf = oui.get_vendor(ouiMap, bssid) if wireless.find("SSID").find("encryption") is not None: @@ -122,7 +126,6 @@ def parse_netxml(ouiMap, name, database, verbose): freqmhz, carrier, encryption, packets_total, lat, lon, cloaked, mfpc, mfpr, firstTimeSeen) - # client clients = wireless.findall("wireless-client") for client in clients: @@ -130,8 +133,12 @@ def parse_netxml(ouiMap, name, database, verbose): manuf = oui.get_vendor(ouiMap, client_mac) firstTimeSeen_string = client.attrib['first-time'] - date_object = datetime.datetime.strptime(firstTimeSeen_string, "%a %b %d %H:%M:%S %Y") - firstTimeSeen = date_object.strftime("%Y-%m-%d %H:%M:%S") + date_object = datetime.datetime.strptime( + firstTimeSeen_string, "%a %b %d %H:%M:%S %Y" + ) + firstTimeSeen = date_object.strftime( + "%Y-%m-%d %H:%M:%S" + ) packets = client.find("packets") packets_total = packets.find("total").text @@ -174,8 +181,13 @@ def parse_kismet_csv(ouiMap, name, database, verbose): # firstTimeSeen firstTimeSeen_string = row[19] - date_object = datetime.datetime.strptime(firstTimeSeen_string, "%a %b %d %H:%M:%S %Y") - firstTimeSeen = date_object.strftime("%Y-%m-%d %H:%M:%S") + + date_object = datetime.datetime.strptime( + firstTimeSeen_string, "%a %b %d %H:%M:%S %Y" + ) + firstTimeSeen = date_object.strftime( + "%Y-%m-%d %H:%M:%S" + ) manuf = oui.get_vendor(ouiMap, bssid) @@ -245,8 +257,8 @@ def parse_csv(ouiMap, name, database, verbose): errors += database_utils.insertAP( cursor, verbose, bssid, essid[1:], manuf, channel, freq, carrier, encrypt, - packets_total, 0, 0, cloaked, mfpc, mfpr, firstTimeSeen) - + packets_total, 0, 0, cloaked, mfpc, mfpr, + firstTimeSeen) if row and row[0] == "Station MAC": client = True @@ -315,7 +327,7 @@ def parse_log_csv(ouiMap, name, database, verbose, fake_lat, fake_lon): errors += database_utils.insertClients( cursor, verbose, mac, ssid, manuf, typeAux, packets_total, device, time) - + errors += database_utils.insertSeenClient( cursor, verbose, mac, time, 'aircrack-ng', signal_rssi, lat, lon, @@ -387,16 +399,19 @@ def parse_handshakes(name, database, verbose): dst = pkt.wlan.da flag = pkt.eapol.wlan_rsna_keydes_key_info # print(flag) - # IF is the second and the prev is the first one add handshake + # IF is the second and the prev is the first one + # add handshake if flag.find('10a') != -1: # print('handhsake 2 of 4') if (prevFlag.find('08a') - and dst == prevSrc and src == prevDst): # first + and dst == prevSrc and src == prevDst): + # first if verbose: - print("Valid handshake from client " + prevSrc + - " to AP " + prevDst) + print("Valid handshake from client " + + prevSrc + " to AP " + prevDst) errors += database_utils.insertHandshake(cursor, - verbose, dst, + verbose, + dst, src, file) else: prevSrc = src @@ -425,10 +440,14 @@ def parse_MFP(name, database, verbose): cursor = database.cursor() errors = 0 file = name - # cap = pyshark.FileCapture(file, display_filter='wlan.fc.type_subtype == 0x0008') + # cap = pyshark.FileCapture(file, + # display_filter='wlan.fc.type_subtype == 0x0008') # Filter only with mfpr or mfpc enable cap = pyshark.FileCapture(file, - display_filter='((wlan.rsn.capabilities.mfpr == 1)||(wlan.rsn.capabilities.mfpc == 1))&&(wlan.fc.type_subtype == 0x0008)') + display_filter='\ + ((wlan.rsn.capabilities.mfpr == 1)||\ + (wlan.rsn.capabilities.mfpc == 1))&&\ + (wlan.fc.type_subtype == 0x0008)') # cap.set_debug() for pkt in cap: @@ -454,7 +473,8 @@ def parse_MFP(name, database, verbose): print(f"MFPR: {mfpr}") errors += database_utils.insertMFP(cursor, verbose, - src, mfpc, mfpr, file) + src, mfpc, + mfpr, file) # wlan_options = pkt['wlan.mgt'].field_names # print(wlan_options) # print(pkt['wlan.mgt']) @@ -482,7 +502,8 @@ def parse_WPS(name, database, verbose): errors = 0 file = name cap = pyshark.FileCapture( - file, display_filter="wps.wifi_protected_setup_state == 0x02 and wlan.da == ff:ff:ff:ff:ff:ff") + file, display_filter="wps.wifi_protected_setup_state == 0x02 and\ + wlan.da == ff:ff:ff:ff:ff:ff") # cap.set_debug() for pkt in cap: @@ -503,8 +524,8 @@ def parse_WPS(name, database, verbose): except Exception: errors += 1 try: - wlan_ssid_hex = pkt[wmgt].wlan_ssid - wlan_ssid_bytes = binascii.unhexlify(wlan_ssid_hex.replace(':', '')) + w_s_hex = pkt[wmgt].wlan_ssid + wlan_ssid_bytes = binascii.unhexlify(w_s_hex.replace(':', '')) wlan_ssid_decode = wlan_ssid_bytes.decode('ascii') if wlan_ssid_decode != "": wlan_ssid = wlan_ssid_decode @@ -544,9 +565,10 @@ def parse_WPS(name, database, verbose): except Exception: errors += 1 - errors += database_utils.insertWPS(cursor, verbose, bssid, wlan_ssid, - wps_version, wps_device_name, - wps_model_name, wps_model_number, + errors += database_utils.insertWPS(cursor, verbose, bssid, + wlan_ssid, wps_version, + wps_device_name, wps_model_name, + wps_model_number, wps_config_methods, wps_config_methods_keypad) @@ -576,7 +598,8 @@ def parse_identities(name, database, verbose): identity = "" method = "" - # The information is: Identity, method, method... , Identity2, method2, method2... + # The information is: Identity, method, method... , + # Identity2, method2, method2... for pkt in cap: # print(pkt.eapol.field_names) try: diff --git a/wifi_db_database.sql b/utils/wifi_db_database.sql similarity index 100% rename from wifi_db_database.sql rename to utils/wifi_db_database.sql diff --git a/wifi_db.py b/wifi_db.py index 7edc6c0..d57ab6b 100755 --- a/wifi_db.py +++ b/wifi_db.py @@ -3,10 +3,11 @@ # -*- coding: utf-8 -*- import argparse -import wifi_db_aircrack -import database_utils +from utils import wifi_db_aircrack +from utils import update +from utils import database_utils +from utils import oui import os -import oui from os import path import platform import subprocess @@ -17,31 +18,35 @@ # import nest_asyncio ; nest_asyncio.apply() -> # Fix RuntimeError: This event loop is already running” -VERSION = '1.3' - +VERSION = '1.4' def banner(): print(''' - _ __ _ _ _ -__ __(_) / _|(_) __| || |__ -\ \ /\ / /| || |_ | | / _` || '_ \ + _ __ _ _ _ +__ __(_) / _|(_) __| || |__ +\ \ /\ / /| || |_ | | / _` || '_ \\ \ V V / | || _|| | | (_| || |_) | - \_/\_/ |_||_| |_| _____ \__,_||_.__/ - |_____| - by r4ulcl + \_/\_/ |_||_| |_| _____ \__,_||_.__/ + |_____| + by r4ulcl ''') def printVersion(): print("wifi_db version:", VERSION) + def replace_multiple_slashes(string): return re.sub('/+', '/', string) def main(): nest_asyncio.apply() + + # Check for update + update.check_for_update(VERSION) + '''Function main. Parse argument and exec the functions ''' # args parser = argparse.ArgumentParser() @@ -144,7 +149,6 @@ def main(): capture = capture[:-1] capture = replace_multiple_slashes(capture) - if source == "aircrack-ng": print("Parsing file:", capture) # Remove format if any @@ -252,58 +256,73 @@ def process_capture(ouiMap, capture, database, captureFormat = capture + ".kismet.netxml" print("Parsing file:", captureFormat) - if database_utils.checkFileProcessed(cursor, - verbose, captureFormat) == 1 and not force: - print("File","already processed\n") + if ( + database_utils.checkFileProcessed( + cursor, verbose, captureFormat + ) == 1 and not force + ): + print("File", "already processed\n") else: database_utils.insertFile(cursor, verbose, captureFormat) wifi_db_aircrack.parse_netxml(ouiMap, captureFormat, - database, verbose) + database, verbose) database_utils.setFileProcessed(cursor, verbose, captureFormat) captureFormat = capture + ".kismet.csv" print("Parsing file:", captureFormat) - if database_utils.checkFileProcessed(cursor, - verbose, captureFormat) == 1 and not force: - print("File","already processed\n") + if ( + database_utils.checkFileProcessed( + cursor, verbose, captureFormat + ) == 1 and not force + ): + print("File", "already processed\n") else: database_utils.insertFile(cursor, verbose, captureFormat) wifi_db_aircrack.parse_kismet_csv(ouiMap, captureFormat, - database, verbose) + database, verbose) database_utils.setFileProcessed(cursor, verbose, captureFormat) captureFormat = capture + ".csv" print("Parsing file:", captureFormat) - if database_utils.checkFileProcessed(cursor, - verbose, captureFormat) == 1 and not force: - print("File","already processed\n") + if ( + database_utils.checkFileProcessed( + cursor, verbose, captureFormat + ) == 1 and not force + ): + print("File", "already processed\n") else: database_utils.insertFile(cursor, verbose, captureFormat) wifi_db_aircrack.parse_csv(ouiMap, captureFormat, - database, verbose) + database, verbose) database_utils.setFileProcessed(cursor, verbose, captureFormat) captureFormat = capture + ".log.csv" print("Parsing file:", captureFormat) - if database_utils.checkFileProcessed(cursor, - verbose, captureFormat) == 1 and not force: - print("File","already processed\n") + if ( + database_utils.checkFileProcessed( + cursor, verbose, captureFormat + ) == 1 and not force + ): + print("File", "already processed\n") else: database_utils.insertFile(cursor, verbose, captureFormat) wifi_db_aircrack.parse_log_csv(ouiMap, captureFormat, - database, verbose, fake_lat, - fake_lon) + database, verbose, fake_lat, + fake_lon) database_utils.setFileProcessed(cursor, verbose, captureFormat) captureFormat = capture + ".cap" print("Parsing file:", captureFormat) - if database_utils.checkFileProcessed(cursor, - verbose, captureFormat) == 1 and not force: - print("File","already processed\n") + if ( + database_utils.checkFileProcessed( + cursor, verbose, captureFormat + ) == 1 and not force + ): + print("File", "already processed\n") else: database_utils.insertFile(cursor, verbose, captureFormat) wifi_db_aircrack.parse_cap(captureFormat, database, verbose, - hcxpcapngtool, tshark) + hcxpcapngtool, tshark) database_utils.setFileProcessed(cursor, verbose, captureFormat)