Skip to content
Snippets Groups Projects
Select Git revision
  • abc70e5fe25dfa4be5207c0ef6329f4a934b83ec
  • master default protected
  • stable
3 results

utils.py

Blame
  • utils.py 8.00 KiB
    from StringIO import StringIO
    import chardet
    import csv
    import os
    import re
    import socket
    import tarfile
    import time
    import zipfile
    from collections import defaultdict
    
    from django.conf import settings
    
    from .models import Article, Ticket, Sessions
    
    
    # TODO: Can't match IDN domains.
    domain_re = re.compile(r'(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+'
                           r'(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)', re.IGNORECASE)
    
    
    def ip_address_check(address):
        try:
            socket.inet_pton(socket.AF_INET, address)
            return True
        except socket.error:
            pass
        try:
            socket.inet_pton(socket.AF_INET6, address)
            return True
        except socket.error:
            return False
    
    
    def get_article(article_id):
        return Article.objects.using('otrs').get(id=article_id)
    
    
    def get_ticket(ticket_id):
        return Ticket.objects.using('otrs').get(id=ticket_id)
    
    
    def get_article_attachments(article, selected_ids=None):
        """
        Returns the list of article attachments suitable for Emails-type remailing.
        """
        def att_similar(a_):
            """
            Checks whether the attachment is very similar to the body. It makes
            little sense to include it then.
            """
            return len(a_.content) > 100 and a_.content == article.a_body
    
        # content_length, is_text, attach_id are here for our templates,
        # using 0|1 as boolean so it can be transported via SOAP safely
        return [dict(Filename=a.filename,
                     Content=a.content,
                     ContentType=a.content_type,
                     content_length=a.content_size,
                     is_text=1 if a.content_type.lower().startswith('text/') else 0,
                     attach_id=a.id,
                     checked=0 if att_similar(a) else 1)
                for a in article.articleattachment_set.order_by('id').iterator()
                if selected_ids is None or a.id in selected_ids]
    
    
    def iter_zipped_incidents(article):
        """
        Retrieves the ZIP file attachment from the specified OTRS article.
        Generates the name and content from each ZIP member *.txt file.
        """
        TEST_INCIDENTS = getattr(settings, 'TEST_INCIDENTS', None)
        if TEST_INCIDENTS:
            for name, content in TEST_INCIDENTS:
                yield name, content, ('ascii', '100%')
            return
    
        for att in article.articleattachment_set.iterator():
            if att.content_type.lower().startswith('application/zip'):
                zipf = zipfile.ZipFile(StringIO(att.content))
                for itemname in zipf.namelist():
                    if itemname.lower().endswith('.txt'):
                        name = os.path.split(itemname)[1][:-4]
                        if name.strip():
                            content = zipf.read(itemname)
                            try:
                                enc = chardet.detect(content)
                                encoding = enc['encoding'], '%d%%' % (enc['confidence'] * 100)
                                content = content.decode(encoding[0])
                            except Exception, e:
                                encoding = 'raw_unicode_escape', '(%r)' % e
                                content = content.decode(encoding[0])
                            yield name, content, encoding
    
                zipf.close()
    
    
    def iter_attachments(article):
        """
        Generates all article attachs: either direct, in tarballs and in ZIPs.
        Yields as tuples (file name, file object).
        """
        for att in article.articleattachment_set.iterator():
            fn = att.filename.lower()
            TARTYPES = {
                '.tar.gz': 'r:gz', '.tgz': 'r:gz', '.tar.bz2': 'r:bz2',
                '.tbz': 'r:bz2', '.tbz2': 'r:bz2', '.tar': 'r',
            }
            tartype = [tt for ext, tt in TARTYPES.items() if fn.endswith(ext)]
            if tartype:
                tarf = tarfile.open(fileobj=StringIO(att.content), mode=tartype[0])
                for item in tarf:
                    if item.isfile() and item.size:
                        fni = '<tt>%s</tt> from <tt>%s</tt>' % (item.name, att.filename)
                        yield fni, tarf.extractfile(item).read()
                tarf.close()
    
            elif fn.endswith('.zip'):
                zipf = zipfile.ZipFile(StringIO(att.content))
                for itemname in zipf.namelist():
                    if zipf.getinfo(itemname).file_size:
                        fni = '<tt>%s</tt> from <tt>%s</tt>' % (itemname, att.filename)
                        yield fni, zipf.open(itemname).read()
                zipf.close()
            else:
                if len(att.content):
                    yield att.filename, att.content
    
    
    def iter_csv_incidents(article, request):
        try:
            skip_files = int(request.GET['skip_files']) + 1
        except:
            skip_files = 1
    
        try:
            header_lines = max(int(request.GET['header_lines']), 0)
        except:
            header_lines = 0
    
        request.META.update(
            INCIDENTS_SOURCE_INDEX=skip_files,
            INCIDENTS_SOURCE='&lt;no file&gt;',
            INCIDENTS_PROCESSED_LINES=0,
        )
    
        delimiter = request.GET.get('delimiter', '')[:1].encode('UTF-8')
        encoding = request.GET.get('encoding') or 'UTF-8'
        confidence = 'default' if encoding.lower() == 'utf-8' else 'forced'
    
        for filename, content in iter_attachments(article):
            skip_files -= 1
            if skip_files:
                continue
    
            content = content.strip()
    
            if isinstance(content, unicode):
                content = content.encode('UTF-8', 'ignore')
            elif encoding.lower() != 'utf-8':
                content = content.decode(encoding, 'ignore')
                content = content.encode('UTF-8', 'ignore')
    
            try:
                dialect = csv.Sniffer().sniff(content, delimiters=',\t;:')
            except csv.Error:
                dialect = None
    
            if delimiter:
                dialect = dialect or csv.excel
                dialect.delimiter = delimiter
            elif ' ' not in content and '\t' not in content:
                dialect = dialect or csv.excel
    
            names = defaultdict(list)
    
            headers = content.split('\n', header_lines)
            content = headers.pop(-1)
    
            rows = content.split('\n')
    
            if dialect:
                rows = csv.reader(rows, dialect)
    
            for row in rows:
                request.META['INCIDENTS_PROCESSED_LINES'] += 1
    
                for value in row:
                    if ip_address_check(value) or domain_re.match(value):
                        if dialect:
                            # need to assemble the row back
                            out = StringIO()
                            csv.writer(out, dialect).writerow(row)
                            names[value].append(
                                out.getvalue()
                            )
                        else:
                            names[value].append(row)
    
            _h = '\n'.join(headers) + '\n'
            for name, rowlist in names.iteritems():
                yield name, _h + '\n'.join(rowlist), (encoding, confidence)
    
            request.META.update(
                INCIDENTS_SOURCE=filename,
                INCIDENTS_DELIMITER=dialect.delimiter if dialect else None,
                INCIDENTS_HEADER_LINES=len(headers),
                INCIDENTS_ENCODING=encoding,
            )
    
            # process only one file
            return
    
    
    def get_otrs_session(otrs_session_id):
        otrs_session_id = os.getenv('FORCE_OTRS_SESSION_ID') or otrs_session_id
    
        session_qs = Sessions.objects.using('otrs').values_list('session_value', flat=True)
        session_value = session_qs.get(session_id=otrs_session_id)
    
        def decode_sessval(sv):
            key, value, unused_rest = sv.split(':', 2)
            return key, value.decode('base64')
    
        session = dict(decode_sessval(sv) for sv in session_value.split(';') if sv)
        if not session.get('UserID') or not session.get('UserLogin'):
            raise RuntimeError("Missing UserID or UserLogin in OTRS session.")
    
        # Intentionally skipping REMOTE_ADDR comparison for debugging.
        # session['UserRemoteAddr'] != request.META['REMOTE_ADDR']
    
        # OTRS defaults
        SessionMaxIdleTime = 5*60*60
        SessionMaxTime = 10*60*60
    
        now = time.time()
        if now-SessionMaxIdleTime >= session['UserLastRequest'] or now-SessionMaxTime >= session['UserSessionStart']:
            raise RuntimeError("Session has timed out. Please log in to OTRS again.")
    
        return session