Select Git revision

Pavel Kácha authored
utils.py 8.00 KiB
from StringIO import StringIO
import chardet
import csv
import os
import re
import socket
import tarfile
import time
import zipfile
from collections import defaultdict
from django.conf import settings
from .models import Article, Ticket, Sessions
# TODO: Can't match IDN domains.
domain_re = re.compile(r'(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+'
r'(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)', re.IGNORECASE)
def ip_address_check(address):
try:
socket.inet_pton(socket.AF_INET, address)
return True
except socket.error:
pass
try:
socket.inet_pton(socket.AF_INET6, address)
return True
except socket.error:
return False
def get_article(article_id):
return Article.objects.using('otrs').get(id=article_id)
def get_ticket(ticket_id):
return Ticket.objects.using('otrs').get(id=ticket_id)
def get_article_attachments(article, selected_ids=None):
"""
Returns the list of article attachments suitable for Emails-type remailing.
"""
def att_similar(a_):
"""
Checks whether the attachment is very similar to the body. It makes
little sense to include it then.
"""
return len(a_.content) > 100 and a_.content == article.a_body
# content_length, is_text, attach_id are here for our templates,
# using 0|1 as boolean so it can be transported via SOAP safely
return [dict(Filename=a.filename,
Content=a.content,
ContentType=a.content_type,
content_length=a.content_size,
is_text=1 if a.content_type.lower().startswith('text/') else 0,
attach_id=a.id,
checked=0 if att_similar(a) else 1)
for a in article.articleattachment_set.order_by('id').iterator()
if selected_ids is None or a.id in selected_ids]
def iter_zipped_incidents(article):
"""
Retrieves the ZIP file attachment from the specified OTRS article.
Generates the name and content from each ZIP member *.txt file.
"""
TEST_INCIDENTS = getattr(settings, 'TEST_INCIDENTS', None)
if TEST_INCIDENTS:
for name, content in TEST_INCIDENTS:
yield name, content, ('ascii', '100%')
return
for att in article.articleattachment_set.iterator():
if att.content_type.lower().startswith('application/zip'):
zipf = zipfile.ZipFile(StringIO(att.content))
for itemname in zipf.namelist():
if itemname.lower().endswith('.txt'):
name = os.path.split(itemname)[1][:-4]
if name.strip():
content = zipf.read(itemname)
try:
enc = chardet.detect(content)
encoding = enc['encoding'], '%d%%' % (enc['confidence'] * 100)
content = content.decode(encoding[0])
except Exception, e:
encoding = 'raw_unicode_escape', '(%r)' % e
content = content.decode(encoding[0])
yield name, content, encoding
zipf.close()
def iter_attachments(article):
"""
Generates all article attachs: either direct, in tarballs and in ZIPs.
Yields as tuples (file name, file object).
"""
for att in article.articleattachment_set.iterator():
fn = att.filename.lower()
TARTYPES = {
'.tar.gz': 'r:gz', '.tgz': 'r:gz', '.tar.bz2': 'r:bz2',
'.tbz': 'r:bz2', '.tbz2': 'r:bz2', '.tar': 'r',
}
tartype = [tt for ext, tt in TARTYPES.items() if fn.endswith(ext)]
if tartype:
tarf = tarfile.open(fileobj=StringIO(att.content), mode=tartype[0])
for item in tarf:
if item.isfile() and item.size:
fni = '<tt>%s</tt> from <tt>%s</tt>' % (item.name, att.filename)
yield fni, tarf.extractfile(item).read()
tarf.close()
elif fn.endswith('.zip'):
zipf = zipfile.ZipFile(StringIO(att.content))
for itemname in zipf.namelist():
if zipf.getinfo(itemname).file_size:
fni = '<tt>%s</tt> from <tt>%s</tt>' % (itemname, att.filename)
yield fni, zipf.open(itemname).read()
zipf.close()
else:
if len(att.content):
yield att.filename, att.content
def iter_csv_incidents(article, request):
try:
skip_files = int(request.GET['skip_files']) + 1
except:
skip_files = 1
try:
header_lines = max(int(request.GET['header_lines']), 0)
except:
header_lines = 0
request.META.update(
INCIDENTS_SOURCE_INDEX=skip_files,
INCIDENTS_SOURCE='<no file>',
INCIDENTS_PROCESSED_LINES=0,
)
delimiter = request.GET.get('delimiter', '')[:1].encode('UTF-8')
encoding = request.GET.get('encoding') or 'UTF-8'
confidence = 'default' if encoding.lower() == 'utf-8' else 'forced'
for filename, content in iter_attachments(article):
skip_files -= 1
if skip_files:
continue
content = content.strip()
if isinstance(content, unicode):
content = content.encode('UTF-8', 'ignore')
elif encoding.lower() != 'utf-8':
content = content.decode(encoding, 'ignore')
content = content.encode('UTF-8', 'ignore')
try:
dialect = csv.Sniffer().sniff(content, delimiters=',\t;:')
except csv.Error:
dialect = None
if delimiter:
dialect = dialect or csv.excel
dialect.delimiter = delimiter
elif ' ' not in content and '\t' not in content:
dialect = dialect or csv.excel
names = defaultdict(list)
headers = content.split('\n', header_lines)
content = headers.pop(-1)
rows = content.split('\n')
if dialect:
rows = csv.reader(rows, dialect)
for row in rows:
request.META['INCIDENTS_PROCESSED_LINES'] += 1
for value in row:
if ip_address_check(value) or domain_re.match(value):
if dialect:
# need to assemble the row back
out = StringIO()
csv.writer(out, dialect).writerow(row)
names[value].append(
out.getvalue()
)
else:
names[value].append(row)
_h = '\n'.join(headers) + '\n'
for name, rowlist in names.iteritems():
yield name, _h + '\n'.join(rowlist), (encoding, confidence)
request.META.update(
INCIDENTS_SOURCE=filename,
INCIDENTS_DELIMITER=dialect.delimiter if dialect else None,
INCIDENTS_HEADER_LINES=len(headers),
INCIDENTS_ENCODING=encoding,
)
# process only one file
return
def get_otrs_session(otrs_session_id):
otrs_session_id = os.getenv('FORCE_OTRS_SESSION_ID') or otrs_session_id
session_qs = Sessions.objects.using('otrs').values_list('session_value', flat=True)
session_value = session_qs.get(session_id=otrs_session_id)
def decode_sessval(sv):
key, value, unused_rest = sv.split(':', 2)
return key, value.decode('base64')
session = dict(decode_sessval(sv) for sv in session_value.split(';') if sv)
if not session.get('UserID') or not session.get('UserLogin'):
raise RuntimeError("Missing UserID or UserLogin in OTRS session.")
# Intentionally skipping REMOTE_ADDR comparison for debugging.
# session['UserRemoteAddr'] != request.META['REMOTE_ADDR']
# OTRS defaults
SessionMaxIdleTime = 5*60*60
SessionMaxTime = 10*60*60
now = time.time()
if now-SessionMaxIdleTime >= session['UserLastRequest'] or now-SessionMaxTime >= session['UserSessionStart']:
raise RuntimeError("Session has timed out. Please log in to OTRS again.")
return session