Skip to content
Snippets Groups Projects
Commit abc70e5f authored by Pavel Kácha's avatar Pavel Kácha
Browse files

Add ipv6 support

parent 1e2d74cf
No related branches found
No related tags found
No related merge requests found
......@@ -3,13 +3,13 @@ import chardet
import csv
import os
import re
import socket
import tarfile
import time
import zipfile
from collections import defaultdict
from django.conf import settings
from django.core.validators import ipv4_re
from .models import Article, Ticket, Sessions
......@@ -19,6 +19,19 @@ domain_re = re.compile(r'(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+'
r'(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)', re.IGNORECASE)
def ip_address_check(address):
try:
socket.inet_pton(socket.AF_INET, address)
return True
except socket.error:
pass
try:
socket.inet_pton(socket.AF_INET6, address)
return True
except socket.error:
return False
def get_article(article_id):
return Article.objects.using('otrs').get(id=article_id)
......@@ -173,7 +186,7 @@ def iter_csv_incidents(article, request):
request.META['INCIDENTS_PROCESSED_LINES'] += 1
for value in row:
if ipv4_re.match(value) or domain_re.match(value):
if ip_address_check(value) or domain_re.match(value):
if dialect:
# need to assemble the row back
out = StringIO()
......
......@@ -2,10 +2,10 @@ from collections import namedtuple
import json
import re
import socket
import struct
import urllib
from django.conf import settings
from django.core.validators import ipv4_re
from useful.django.caching import cached_function
......@@ -19,6 +19,19 @@ FQDN_RE = re.compile(r'^(?=.{1,255}$)[0-9A-Z](?:(?:[0-9A-Z]|-){0,61}[0-9A-Z])?'
'(?:\.[0-9A-Z](?:(?:[0-9A-Z]|-){0,61}[0-9A-Z])?)*\.?$', re.IGNORECASE)
def ip_address_check(address):
try:
socket.inet_pton(socket.AF_INET, address)
return True
except socket.error:
pass
try:
socket.inet_pton(socket.AF_INET6, address)
return True
except socket.error:
return False
@cached_function
def gethostbyname(host):
try:
......@@ -31,7 +44,7 @@ def gethostbyname(host):
def whois(server, query):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((server, 43))
s.send('%s\r\n' % query)
s.send('-r %s\r\n' % query)
d, response = True, ''
while d:
d = s.recv(1024)
......@@ -92,7 +105,7 @@ def query_RIPE(ip):
BLACKLIST_SET = set(email.strip().lower() for email in settings.REMAILER_WHOIS_BLACKLIST)
INTERESTING_FIELDS = 'first last netname resolved_abuses'
NegistryBlock = namedtuple('NegistryBlock', INTERESTING_FIELDS)
NegistryBlock = namedtuple('NegistryBlock', INTERESTING_FIELDS + ' proto')
@cached_function
......@@ -103,25 +116,34 @@ def get_negistry_tuples():
if key.startswith('_'):
continue
negistry_tuple = NegistryBlock(*(block[k] for k in INTERESTING_FIELDS.split(' ')))
negistry_tuple = NegistryBlock(*([block[k] for k in INTERESTING_FIELDS.split(' ')] + ["6" if "ip6_addr" in block else "4"]))
negistry.append(negistry_tuple)
return negistry
def ipv4_to_integer(ipv4):
return reduce(lambda a, b: a << 8 | b, map(int, ipv4.split(".")))
def ip_to_int(s):
try:
return struct.unpack("!L", socket.inet_pton(socket.AF_INET, s))[0], "4"
except Exception:
pass
try:
hi, lo = struct.unpack("!QQ", socket.inet_pton(socket.AF_INET6, s))
return hi << 64 | lo, "6"
except Exception:
raise ValueError("Wrong IP address format: %s" % s)
def query_Negistry(ip):
if settings.USE_NEGISTRY_API:
host_integer = ipv4_to_integer(ip)
host_integer, proto = ip_to_int(ip)
best = None
for tup in get_negistry_tuples():
# Is there matching range?
if tup.first <= host_integer <= tup.last:
if tup.proto == proto and tup.first <= host_integer <= tup.last:
# If so, pick up the smallest one.
if not best or (best.last-best.first) > (tup.last-tup.first):
......@@ -139,7 +161,7 @@ def whois_host(host):
host = host.strip()
domain, ip = '', None
if ipv4_re.match(host):
if ip_address_check(host):
ip = host
elif FQDN_RE.match(host):
ip = gethostbyname(host)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment