Skip to content
Snippets Groups Projects
Commit 24a68103 authored by Jan Mach's avatar Jan Mach
Browse files

Implemented prototype SQL-based whois service.

(Redmine issue: #3752)
parent d54b32c2
No related branches found
No related tags found
No related merge requests found
...@@ -228,7 +228,37 @@ class TestMentatWhois(unittest.TestCase): ...@@ -228,7 +228,37 @@ class TestMentatWhois(unittest.TestCase):
self.assertEqual(whois.lookup_abuse('2001:718:1:6::134:183'), ['abuse@cesnet.cz']) self.assertEqual(whois.lookup_abuse('2001:718:1:6::134:183'), ['abuse@cesnet.cz'])
pprint(whois.lookup('abuse@cesnet.cz')) pprint(whois.lookup('abuse@cesnet.cz'))
def test_03_file(self): def test_03_sql(self):
"""
Perform basic operativity tests.
"""
self.maxDiff = None
mentat.services.sqlstorage.init(
{
"__core__database": {
"sqlstorage": {
"url": "postgresql://mentat:mentat@localhost/mentat_main",
"echo": False
}
}
}
)
whois = mentat.whois.WhoisService([
mentat.whois.SqldbWhoisModule().setup()
])
#for mod in whois.modules:
# pprint(mod)
# pprint(mod.networks_ip4)
pprint(whois.lookup('195.113.134.183'))
pprint(whois.lookup('2001:718:1:6::134:183'))
self.assertEqual(whois.lookup_abuse('195.113.134.183'), ['abuse@cesnet.cz'])
self.assertEqual(whois.lookup_abuse('2001:718:1:6::134:183'), ['abuse@cesnet.cz'])
pprint(whois.lookup('abuse@cesnet.cz'))
def test_04_file(self):
""" """
Perform basic operativity tests. Perform basic operativity tests.
""" """
...@@ -244,7 +274,7 @@ class TestMentatWhois(unittest.TestCase): ...@@ -244,7 +274,7 @@ class TestMentatWhois(unittest.TestCase):
pprint(whois.lookup('78.128.214.67')) pprint(whois.lookup('78.128.214.67'))
self.assertEqual(whois.lookup_abuse('78.128.214.67'), ['abuse@cesnet.cz']) self.assertEqual(whois.lookup_abuse('78.128.214.67'), ['abuse@cesnet.cz'])
def test_04_file(self): def test_05_files(self):
""" """
Perform basic operativity tests. Perform basic operativity tests.
""" """
...@@ -262,7 +292,7 @@ class TestMentatWhois(unittest.TestCase): ...@@ -262,7 +292,7 @@ class TestMentatWhois(unittest.TestCase):
self.assertEqual(whois.lookup_abuse('78.128.214.67'), ['abuse@cuni.cz']) self.assertEqual(whois.lookup_abuse('78.128.214.67'), ['abuse@cuni.cz'])
pprint(whois.lookup('abuse@cuni.cz')) pprint(whois.lookup('abuse@cuni.cz'))
def test_05_service_manager(self): def test_06_service_manager(self):
""" """
Perform basic operativity tests. Perform basic operativity tests.
""" """
......
...@@ -30,8 +30,9 @@ import copy ...@@ -30,8 +30,9 @@ import copy
# #
import ipranges import ipranges
import mentat.const import mentat.const
import mentat.storage import mentat.datatype.sqldb
import mentat.datatype.internal import mentat.datatype.internal
import mentat.services.sqlstorage
from mentat.const import CKEY_CORE_SERVICES, CKEY_CORE_SERVICES_WHOIS from mentat.const import CKEY_CORE_SERVICES, CKEY_CORE_SERVICES_WHOIS
...@@ -238,6 +239,41 @@ class MongodbWhoisModule(WhoisModule): ...@@ -238,6 +239,41 @@ class MongodbWhoisModule(WhoisModule):
# Let the parent implementation take care of loading internal lookup table. # Let the parent implementation take care of loading internal lookup table.
return super().setup(networks) return super().setup(networks)
class SqldbWhoisModule(WhoisModule):
"""
Implementation of whois module capable of lookup in given SQL database table.
"""
def __init__(self, storage_manager = None):
super().__init__()
self.stmng = storage_manager
def setup(self):
"""
Perform full setup of internal whois lookup table by loading the data from
MongoDB collection.
"""
# Obtain reference for storage manager, if necessary.
if not self.stmng:
self.stmng = mentat.services.sqlstorage.manager()
storage = self.stmng.service()
# Load network records from SQL database.
records = storage.session.query(mentat.datatype.sqldb.NetworkModel).all()
# Generate list of internal network record objects.
networks = []
for rec in records:
netw = mentat.datatype.internal.t_network_record({
'network': rec.network,
'resolved_abuses': [rec.group.name]
})
networks.append(netw)
# Let the parent implementation take care of loading internal lookup table.
return super().setup(networks)
class WhoisService: class WhoisService:
""" """
Implementation of more complex whois service capable of encapsulating multiple Implementation of more complex whois service capable of encapsulating multiple
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment