Skip to content
Snippets Groups Projects
Commit 792f9080 authored by Václav Bartoš's avatar Václav Bartoš
Browse files

shodan2warden connector added

parent 00d329f0
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Author: Pavla Hlučková
# Václav Bartoš <bartos@cesnet.cz>
from shodan import Shodan
from datetime import datetime
from uuid import uuid4
import json
import os
import argparse
# Global variables
VERBOSE = False
test_category = False
node_name = 'undefined'
def vprint(*args, **kwargs):
# Verbose print
if VERBOSE:
print("[{}] ".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S")), end="")
print(*args, **kwargs)
def generateIdeaEvent(detect_time, category, description,ip_string,port_num,proto,content_type,content,note):
create_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
# if there's no timezone in detect_time, assume UTC (which Shodan normally uses) and append 'Z'
if 'Z' not in detect_time and '+' not in detect_time and '-' not in detect_time:
detect_time += 'Z'
event = {
"Format": "IDEA0",
"ID": str(uuid4()),
"Category": [category],
"CreateTime": create_time,
"DetectTime": detect_time,
"Description": description,
"Ref": ["https://www.shodan.io/host/" + ip_string],
"Source": [
{
"IP4": [ip_string],
"Port": [port_num],
"Proto": proto
}
],
"Node": [
{
"Name": node_name,
"SW": ["shodan2warden"],
"Type": ["External", "Recon"]
}
],
"Attach": [
{
"ContentType": content_type,
"Content": content,
"Note": note
}
]
}
if test_category:
event["Category"].append("Test")
filename = "{}_{}_{}.json".format(
datetime.utcnow().strftime('%Y%m%d%H%M%S%f'),
ip_string,
event['ID'][:8])
tmp_destination = os.path.join(default_directory, 'tmp', filename)
inc_destination = os.path.join(default_directory, 'incoming', filename)
with open(tmp_destination, 'w') as json_file:
json.dump(event, json_file)
os.rename(tmp_destination, inc_destination)
def IPMI():
query = asnString+" port:623"
category = "Vulnerable.Config"
description = "Publicly accessible insecure protocol: IPMI"
proto = ["udp", "ipmi"]
content_type = "text/plain"
note = "Original service banner from Shodan"
vprint("Querying for IMPI:", query)
for banner in api.search_cursor(query):
vprint("Found problematic IP:", banner.get('ip_str'))
generateIdeaEvent(banner.get('timestamp'), category, description,
banner.get('ip_str'), banner.get('port'),
proto, content_type, banner.get('data'), note)
def SCADA():
query = asnString+" port:47808"
category = "Vulnerable.Config"
description = "Publicly accessible SCADA (BACnet) system"
proto = ["udp", "bacnet"]
content_type = "text/plain"
note = "Original service banner from Shodan"
vprint("Querying for SCADA:", query)
for banner in api.search_cursor(query):
vprint("Found problematic IP:", banner.get('ip_str'))
generateIdeaEvent(banner.get('timestamp'), category, description,
banner.get('ip_str'), banner.get('port'),
proto, content_type, banner.get('data'), note)
def printerPJL():
query = asnString + " port:9100 PJL INFO STATUS"
category = "Vulnerable.Config"
description = "Vulnerable PJL printer"
proto = []
content_type = "text/plain"
note = "Original service banner from Shodan"
vprint("Querying for printer PLJ:", query)
for banner in api.search_cursor(query):
vprint("Found problematic IP:", banner.get('ip_str'))
generateIdeaEvent(banner.get('timestamp'), category, description,
banner.get('ip_str'), banner.get('port'),
proto, content_type, banner.get('data'), note)
def printerIPP():
query = asnString+" port:631"
category = "Vulnerable.Config"
description = "Potentially vulnerable IPP printer"
proto = ["ipp"]
content_type = "text/plain"
note = "Original service banner from Shodan"
vprint("Querying for printer IPP:", query)
for banner in api.search_cursor(query):
if "close" in banner.get('data'):
continue
vprint("Found problematic IP:", banner.get('ip_str'))
generateIdeaEvent(banner.get('timestamp'), category, description,
banner.get('ip_str'), banner.get('port'),
proto, content_type, banner.get('data'), note)
def mongoDB():
query = asnString+" \"mongodb metrics\""
category = "Vulnerable.Config"
description = "Potentially vulnerable MongoDB database"
proto = ["tcp"]
content_type = "text/plain"
note = "Original service banner from Shodan"
vprint("Querying for MongoDB:", query)
for banner in api.search_cursor(query):
if "\"totalSize\": 0.0" in banner.get('data'):
continue # skip if database is empty
if "hacked" in banner.get('data'):
category = "Information.UnauthorizedModification"
description = "Potentially exploited mongoDB database"
vprint("Found problematic IP:", banner.get('ip_str'))
generateIdeaEvent(banner.get('timestamp'), category, description,
banner.get('ip_str'), banner.get('port'),
proto, content_type, banner.get('data'), note)
def elasticIndices():
query = asnString+" \"Elastic Indices\""
category = "Vulnerable.Config"
description = "Possibly vulnerable data displayed - Elastic Indices"
proto = ["tcp"]
content_type = "text/plain"
note = "Original service banner from Shodan"
vprint("Querying for Elastic Indices:", query)
for banner in api.search_cursor(query):
vprint("Found problematic IP:", banner.get('ip_str'))
generateIdeaEvent(banner.get('timestamp'), category, description,
banner.get('ip_str'), banner.get('port'),
proto, content_type, banner.get('data'), note)
def anonFTP():
query = asnString+" port:21 \"anonymous logged in\""
category = "Vulnerable.Config"
description = "Open anonymous FTP"
proto = ["tcp", "ftp"]
content_type = "text/plain"
note = "Original service banner from Shodan"
vprint("Querying for anonymous FTP:", query)
for banner in api.search_cursor(query):
vprint("Found problematic IP:", banner.get('ip_str'))
generateIdeaEvent(banner.get('timestamp'), category, description,
banner.get('ip_str'), banner.get('port'),
proto, content_type, banner.get('data'), note)
def hacked():
query = asnString+" \"hacked\""
category = "Information.UnauthorizedModification"
description = "Service probably hacked (\"hacked\" string found in banner)"
proto = []
content_type = "text/plain"
note = "Original service banner from Shodan"
vprint("Querying for \"hacked\" string:", query)
for banner in api.search_cursor(query):
vprint("Found problematic IP:", banner.get('ip_str'))
generateIdeaEvent(banner.get('timestamp'), category, description,
banner.get('ip_str'), banner.get('port'),
proto, content_type, banner.get('data'), note)
def unsupportedPHP():
query = asnString+" PHP"
category = "Vulnerable.Open"
description = "Web running on old (unsupported) PHP version"
proto = ["tcp", "http"]
content_type = "text/plain"
note = "Original service banner from Shodan"
vprint("Querying for unsupported PHP:", query)
for banner in api.search_cursor(query):
if "test page" in banner.get('data'):
continue
if "PHP/5" in banner.get('data') or "PHP/4" in banner.get('data'):
vprint("Found problematic IP:", banner.get('ip_str'))
generateIdeaEvent(banner.get('timestamp'), category, description,
banner.get('ip_str'), banner.get('port'),
proto, content_type, banner.get('data'), note)
def parse_args():
# command line argument parser
parser = argparse.ArgumentParser(description="Searches Shodan for potential problems with open services in given ASN. For each such problem generates an IDEA message into gievn directory (to be sent to Warden by warden_filer). This script is assumed to be run daily by cron.")
parser.add_argument('-k', '--apikey', required=True,
help="Shodan API key")
parser.add_argument('-a', '--asn', type=int, required=True,
help="ASN to query")
parser.add_argument('-n', '--node', required=True,
help="Node name to fill into IDEA messages")
parser.add_argument('-d', '--destdir', dest="path", default=os.getcwd(),
help="Path to destination directory (with 'incoming' and 'temp' subdirectories) (default: CWD)")
parser.add_argument('-t', '--test', action="store_true",
help="Add 'Test' category to IDEA messages.")
parser.add_argument('-v', '--verbose', action="store_true",
help="Print information about progress and results")
return parser.parse_args()
def main():
IPMI()
SCADA()
printerPJL()
printerIPP()
mongoDB()
elasticIndices()
anonFTP()
hacked()
unsupportedPHP()
if __name__ == "__main__":
#getting arguments from argparse
args = parse_args()
VERBOSE = args.verbose
default_directory = args.path
node_name = args.node
test_category = args.test
asnString = "asn:as" + str(args.asn)
api = Shodan(args.apikey)
# incoming directory creation
directory = "incoming"
path = os.path.join(default_directory, directory)
os.makedirs(path, exist_ok=True)
# tmp directory creation
directory = "tmp"
path = os.path.join(default_directory, directory)
os.makedirs(path, exist_ok=True)
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment