From 32da7adae9a4d8367e576e4e7c2a3143ddeb1c87 Mon Sep 17 00:00:00 2001 From: Dom Amato Date: Fri, 10 Apr 2020 19:36:11 -0500 Subject: [PATCH] Python3 (#1) --- .github/workflows/pythontest.yml | 40 ++ README => README.md | 6 +- doc/conf.py | 12 +- doc/modules/utils.rst | 4 +- doc/tutorial/mms.rst | 16 +- doc/tutorial/sms.rst | 16 +- messaging/mms/iterator.py | 12 +- messaging/mms/message.py | 4 +- messaging/mms/mms_pdu.py | 59 ++- messaging/mms/wsp_pdu.py | 142 ++--- messaging/sms/__init__.py | 4 +- messaging/sms/base.py | 2 +- messaging/sms/deliver.py | 20 +- messaging/sms/gsm0338.py | 489 +++++++++--------- messaging/sms/pdu.py | 2 +- messaging/sms/submit.py | 59 ++- messaging/sms/udh.py | 6 +- messaging/sms/wap.py | 2 +- messaging/test/test_gsm_encoding.py | 267 ---------- messaging/utils.py | 113 +++- resources/pydump.py | 2 +- setup.py | 38 +- {messaging/test => tests}/__init__.py | 0 .../27d0a048cd79555de05283a22372b0eb.mms | Bin {messaging/test => tests}/mms-data/BTMMS.MMS | Bin {messaging/test => tests}/mms-data/NOWMMS.MMS | Bin .../test => tests}/mms-data/SEC-SGHS300M.mms | Bin {messaging/test => tests}/mms-data/SIMPLE.MMS | Bin .../mms-data/SonyEricssonT310-R201.mms | Bin .../test => tests}/mms-data/TOMSLOT.MMS | Bin .../test => tests}/mms-data/gallery2test.mms | Bin {messaging/test => tests}/mms-data/iPhone.mms | Bin .../mms-data/images_are_cut_off_debug.mms | Bin {messaging/test => tests}/mms-data/m.mms | Bin .../test => tests}/mms-data/openwave.mms | Bin .../mms-data/projekt_exempel.mms | Bin tests/test_gsm_encoding.py | 256 +++++++++ {messaging/test => tests}/test_mms.py | 71 +-- {messaging/test => tests}/test_sms.py | 95 ++-- {messaging/test => tests}/test_udh.py | 10 +- {messaging/test => tests}/test_wap.py | 23 +- 41 files changed, 946 insertions(+), 824 deletions(-) create mode 100644 .github/workflows/pythontest.yml rename README => README.md (76%) delete mode 100644 messaging/test/test_gsm_encoding.py rename {messaging/test => tests}/__init__.py (100%) rename {messaging/test => tests}/mms-data/27d0a048cd79555de05283a22372b0eb.mms (100%) rename {messaging/test => tests}/mms-data/BTMMS.MMS (100%) rename {messaging/test => tests}/mms-data/NOWMMS.MMS (100%) rename {messaging/test => tests}/mms-data/SEC-SGHS300M.mms (100%) rename {messaging/test => tests}/mms-data/SIMPLE.MMS (100%) rename {messaging/test => tests}/mms-data/SonyEricssonT310-R201.mms (100%) rename {messaging/test => tests}/mms-data/TOMSLOT.MMS (100%) rename {messaging/test => tests}/mms-data/gallery2test.mms (100%) rename {messaging/test => tests}/mms-data/iPhone.mms (100%) rename {messaging/test => tests}/mms-data/images_are_cut_off_debug.mms (100%) rename {messaging/test => tests}/mms-data/m.mms (100%) rename {messaging/test => tests}/mms-data/openwave.mms (100%) rename {messaging/test => tests}/mms-data/projekt_exempel.mms (100%) create mode 100644 tests/test_gsm_encoding.py rename {messaging/test => tests}/test_mms.py (71%) rename {messaging/test => tests}/test_sms.py (83%) rename {messaging/test => tests}/test_udh.py (71%) rename {messaging/test => tests}/test_wap.py (88%) diff --git a/.github/workflows/pythontest.yml b/.github/workflows/pythontest.yml new file mode 100644 index 0000000..7d2db5e --- /dev/null +++ b/.github/workflows/pythontest.yml @@ -0,0 +1,40 @@ +# This workflow will install Python dependencies, run tests and lint with a variety of Python versions +# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions + +name: Python Test + +on: + push: + branches: + - '**' + pull_request: + branches: [ master ] + +jobs: + build: + + runs-on: ubuntu-latest + strategy: + matrix: + python-version: [3.5, 3.6, 3.7, 3.8] + + steps: + - uses: actions/checkout@v2 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v1 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip + - name: Lint + run: | + pip install pylint + pylint -rn --errors-only ./messaging + - name: Test + env: + COVERALLS_REPO_TOKEN: ${{ secrets.COVERALLS_REPO_TOKEN }} + run: | + pip install coveralls pytest-cov + pytest --cov=messaging tests/ + coveralls diff --git a/README b/README.md similarity index 76% rename from README rename to README.md index 210e792..b00be57 100644 --- a/README +++ b/README.md @@ -1,5 +1,7 @@ -What is python-messaging? -========================= +# python-messaging + +[![Python Test](https://github.com/DomAmato/python-messaging/workflows/Python%20Test/badge.svg)](https://github.com/DomAmato/python-messaging/actions) +[![Coverage Status](https://coveralls.io/repos/github/DomAmato/python-messaging/badge.svg?branch=master)](https://coveralls.io/github/DomAmato/python-messaging?branch=master) A SMS/MMS encoder/decoder written 100% in Python. diff --git a/doc/conf.py b/doc/conf.py index 84f79bb..aa0ee98 100644 --- a/doc/conf.py +++ b/doc/conf.py @@ -20,8 +20,8 @@ master_doc = 'index' # General information about the project. -project = u'python-messaging' -copyright = u'2010, Pablo Martí' +project = 'python-messaging' +copyright = '2010, Pablo Martí' # The short X.Y version. version = '0.5.9' @@ -150,8 +150,8 @@ # Grouping the document tree into LaTeX files. List of tuples # (source start file, target name, title, author, documentclass [howto/manual]). latex_documents = [ - ('index', 'python-messaging.tex', u'python-messaging Documentation', - u'Pablo Martí', 'manual'), + ('index', 'python-messaging.tex', 'python-messaging Documentation', + 'Pablo Martí', 'manual'), ] # The name of an image file (relative to this directory) to place at the top of @@ -183,6 +183,6 @@ # One entry per manual page. List of tuples # (source start file, name, description, authors, manual section). man_pages = [ - ('index', 'python-messaging', u'python-messaging Documentation', - [u'Pablo Martí'], 1) + ('index', 'python-messaging', 'python-messaging Documentation', + ['Pablo Martí'], 1) ] diff --git a/doc/modules/utils.rst b/doc/modules/utils.rst index 21f89ab..db854a2 100644 --- a/doc/modules/utils.rst +++ b/doc/modules/utils.rst @@ -15,9 +15,7 @@ Functions .. autofunction:: bytes_to_str -.. autofunction:: to_array - -.. autofunction:: to_bytes +.. autofunction:: hex_to_int_array .. autofunction:: swap diff --git a/doc/tutorial/mms.rst b/doc/tutorial/mms.rst index 65df390..db31614 100644 --- a/doc/tutorial/mms.rst +++ b/doc/tutorial/mms.rst @@ -75,7 +75,7 @@ for a plain HTTP POST:: data = buf.getvalue() buf.close() - print "PROXY RESPONSE", data + print("PROXY RESPONSE", data) Encoding a m-notifyresp-ind PDU @@ -110,8 +110,8 @@ MMS, you just need to:: # data is an array.array("B") instance mms = MMSMessage.from_data(data) - print mms.headers['Message-Type'] # m-send-req - print mms.headers['To'] # '+34231342234/TYPE=PLMN' + print(mms.headers['Message-Type'] # m-send-req) + print(mms.headers['To'] # '+34231342234/TYPE=PLMN') Decoding from a file @@ -125,8 +125,8 @@ need the path to the file and:: path = '/tmp/binary-mms.bin' mms = MMSMessage.from_file(path) - print mms.headers['Message-Type'] # m-send-req - print mms.headers['To'] # '+34231342234/TYPE=PLMN' + print(mms.headers['Message-Type'] # m-send-req) + print(mms.headers['To'] # '+34231342234/TYPE=PLMN') Obtaining a MMS from a WAP push notification @@ -144,7 +144,7 @@ headers:: "0791447758100650400E80885810000000810004016082415464408C0C08049F8E020105040B8423F00106226170706C69636174696F6E2F766E642E7761702E6D6D732D6D65737361676500AF848C82984E4F4B3543694B636F544D595347344D4253774141734B7631344655484141414141414141008D908919802B3434373738353334323734392F545950453D504C4D4E008A808E0274008805810301194083687474703A2F", "0791447758100650440E8088581000000081000401608241547440440C08049F8E020205040B8423F02F70726F6D6D732F736572766C6574732F4E4F4B3543694B636F544D595347344D4253774141734B763134465548414141414141414100", ] - data = "" + data = b"" sms = SmsDeliver(pdus[0]) data += sms.text @@ -154,7 +154,7 @@ headers:: mms = extract_push_notification(data) url = mms.headers['Content-Location'] - print url + print(url) Once you have the content location, you need to do a HTTP GET to retrieve @@ -184,4 +184,4 @@ the MMS payload:: buf.close() mms = MMSMessage.from_data(data) - print mms + print(mms) diff --git a/doc/tutorial/sms.rst b/doc/tutorial/sms.rst index 7277224..8aae92a 100644 --- a/doc/tutorial/sms.rst +++ b/doc/tutorial/sms.rst @@ -36,7 +36,7 @@ How to encode a single part SMS ready to be sent:: sms = SmsSubmit("+44123231231", "hey how's it going?") pdu = sms.to_pdu()[0] - print pdu.length, pdu.pdu + print(pdu.length, pdu.pdu) How to encode a concatenated SMS ready to be sent:: @@ -45,7 +45,7 @@ How to encode a concatenated SMS ready to be sent:: sms = SmsSubmit("+44123231231", "hey " * 50) for pdu in sms.to_pdu(): - print pdu.length, pdu.pdu + print(pdu.length, pdu.pdu) Setting class @@ -59,7 +59,7 @@ Setting the SMS class (0-3) is a no brainer:: sms.class = 0 pdu = sms.to_pdu()[0] - print pdu.length, pdu.pdu + print(pdu.length, pdu.pdu) Setting validity @@ -78,7 +78,7 @@ Setting absolute validity:: sms.validity = datetime(2010, 12, 31, 23, 59, 59) pdu = sms.to_pdu()[0] - print pdu.length, pdu.pdu + print(pdu.length, pdu.pdu) Setting relative validity:: @@ -90,7 +90,7 @@ Setting relative validity:: sms.validity = timedelta(hours=5) pdu = sms.to_pdu()[0] - print pdu.length, pdu.pdu + print(pdu.length, pdu.pdu) Decoding @@ -103,10 +103,10 @@ term:`PDU` decoding is really simple with :class:`~messaging.sms.SmsDeliver`:: pdu = "0791447758100650040C914497726247010000909010711423400A2050EC468B81C4733A" sms = SmsDeliver(pdu) - print sms.data + print(sms.data) # {'csca': '+447785016005', 'type': None, # 'date': datetime.datetime(2009, 9, 1, 16, 41, 32), - # 'text': u' 1741 bst', 'fmt': 0, 'pid': 0, + # 'text': ' 1741 bst', 'fmt': 0, 'pid': 0, # 'dcs': 0, 'number': '+447927267410'} Apart from the pdu, the :py:meth:`messaging.sms.SmsDeliver.__init__` accepts a @@ -135,7 +135,7 @@ registered:: # prompt appears (a more robust implementation # would wait till the prompt appeared) ser.write('AT+CMGS=%d\r' % pdu.length) - print ser.readlines() + print(ser.readlines()) # write the PDU and send a Ctrl+z escape ser.write('%s\x1a' % pdu.pdu) ser.close() diff --git a/messaging/mms/iterator.py b/messaging/mms/iterator.py index cd49505..a1b6fc1 100644 --- a/messaging/mms/iterator.py +++ b/messaging/mms/iterator.py @@ -15,11 +15,11 @@ """Iterator with "value preview" capability.""" -class PreviewIterator(object): +class PreviewIterator: """An ``iter`` wrapper class providing a "previewable" iterator. This "preview" functionality allows the iterator to return successive - values from its ``iterable`` object, without actually mvoving forward + values from its ``iterable`` object, without actually moving forward itself. This is very usefuly if the next item(s) in an iterator must be used for something, after which the iterator should "undo" those read operations, so that they can be read again by another function. @@ -32,15 +32,15 @@ def __init__(self, data): self._cached_values = [] self._preview_pos = 0 + #pylint: disable=non-iterator-returned def __iter__(self): return self - def next(self): + def __next__(self): self.reset_preview() if len(self._cached_values) > 0: return self._cached_values.pop(0) - else: - return self._it.next() + return next(self._it) def preview(self): """ @@ -61,7 +61,7 @@ def preview(self): if self._preview_pos < len(self._cached_values): value = self._cached_values[self._preview_pos] else: - value = self._it.next() + value = next(self._it) self._cached_values.append(value) self._preview_pos += 1 diff --git a/messaging/mms/message.py b/messaging/mms/message.py index 6cff8a0..1c1d7dc 100644 --- a/messaging/mms/message.py +++ b/messaging/mms/message.py @@ -430,7 +430,7 @@ def set_duration(self, duration): self.duration = duration -class DataPart(object): +class DataPart: """ I am a data entry in the MMS body @@ -544,7 +544,7 @@ def data(self): """A buffer containing the binary data of this part""" if self._data is not None: if type(self._data) == array.array: - self._data = self._data.tostring() + self._data = self._data.tobytes() return self._data elif self._filename is not None: diff --git a/messaging/mms/mms_pdu.py b/messaging/mms/mms_pdu.py index ed44c46..0209ded 100644 --- a/messaging/mms/mms_pdu.py +++ b/messaging/mms/mms_pdu.py @@ -15,8 +15,8 @@ import array import os import random +import logging -from messaging.utils import debug from messaging.mms import message, wsp_pdu from messaging.mms.iterator import PreviewIterator @@ -25,7 +25,7 @@ def flatten_list(x): """Flattens ``x`` into a single list""" result = [] for el in x: - if hasattr(el, "__iter__") and not isinstance(el, basestring): + if hasattr(el, "__iter__") and not isinstance(el, str): result.extend(flatten_list(el)) else: result.append(el) @@ -167,7 +167,7 @@ def decode_message_body(self, data_iter): except StopIteration: return - #print 'Number of data entries (parts) in MMS body:', num_entries + logging.debug('Number of data entries (parts) in MMS body: %i' % num_entries) ########## MMS body: entries ########## # For every data "part", we have to read the following sequence: @@ -175,15 +175,15 @@ def decode_message_body(self, data_iter): # , # , # - for part_num in xrange(num_entries): - #print '\nPart %d:\n------' % part_num + for part_num in range(num_entries): + logging.debug('\nPart %d:\n------' % part_num) headers_len = self.decode_uint_var(data_iter) data_len = self.decode_uint_var(data_iter) # Prepare to read content-type + other possible headers ct_field_bytes = [] - for i in xrange(headers_len): - ct_field_bytes.append(data_iter.next()) + for i in range(headers_len): + ct_field_bytes.append(next(data_iter)) ct_iter = PreviewIterator(ct_field_bytes) # Get content type @@ -201,8 +201,8 @@ def decode_message_body(self, data_iter): # Data (note: this is not null-terminated) data = array.array('B') - for i in xrange(data_len): - data.append(data_iter.next()) + for i in range(data_len): + data.append(next(data_iter)) part = message.DataPart() part.set_data(data, ctype) @@ -269,7 +269,7 @@ def decode_mms_header(byte_iter): byte = wsp_pdu.Decoder.decode_short_integer_from_byte(preview) if byte in mms_field_names: - byte_iter.next() + next(byte_iter) mms_field_name = mms_field_names[byte][0] else: byte_iter.reset_preview() @@ -281,9 +281,9 @@ def decode_mms_header(byte_iter): try: name = mms_field_names[byte][1] mms_value = getattr(MMSDecoder, 'decode_%s' % name)(byte_iter) - except wsp_pdu.DecodeError, msg: + except wsp_pdu.DecodeError as e: raise wsp_pdu.DecodeError('Invalid MMS Header: Could ' - 'not decode MMS-value: %s' % msg) + 'not decode MMS-value: %s' % e) except: raise RuntimeError('A fatal error occurred, probably due to an ' 'unimplemented decoding operation. Tried to ' @@ -316,11 +316,11 @@ def decode_encoded_string_value(byte_iter): # TODO: add proper support for charsets... try: charset = wsp_pdu.Decoder.decode_well_known_charset(byte_iter) - except wsp_pdu.DecodeError, msg: + except wsp_pdu.DecodeError as e: raise Exception('encoded_string_value decoding error - ' - 'Could not decode Charset value: %s' % msg) + 'Could not decode Charset value: %s' % e) - return wsp_pdu.Decoder.decode_text_string(byte_iter) + return wsp_pdu.Decoder.decode_text_string(byte_iter, charset) except wsp_pdu.DecodeError: # Fall back on just "Text-string" return wsp_pdu.Decoder.decode_text_string(byte_iter) @@ -350,13 +350,13 @@ def decode_boolean_value(byte_iter): byte_iter.reset_preview() raise wsp_pdu.DecodeError('Error parsing boolean value ' 'for byte: %s' % hex(byte)) - byte = byte_iter.next() + byte = next(byte_iter) return byte == 128 @staticmethod def decode_delivery_time_value(byte_iter): value_length = wsp_pdu.Decoder.decode_value_length(byte_iter) - token = byte_iter.next() + token = next(byte_iter) value = wsp_pdu.Decoder.decode_long_integer(byte_iter) if token == 128: token_type = 'absolute' @@ -383,7 +383,7 @@ def decode_from_value(byte_iter): """ value_length = wsp_pdu.Decoder.decode_value_length(byte_iter) # See what token we have - byte = byte_iter.next() + byte = next(byte_iter) if byte == 129: # Insert-address-token return '' @@ -416,7 +416,7 @@ def decode_message_class_value(byte_iter): } byte = byte_iter.preview() if byte in class_identifiers: - byte_iter.next() + next(byte_iter) return class_identifiers[byte] byte_iter.reset_preview() @@ -444,7 +444,7 @@ def decode_message_type_value(byte_iter): byte = byte_iter.preview() if byte in message_types: - byte_iter.next() + next(byte_iter) return message_types[byte] byte_iter.reset_preview() @@ -467,7 +467,7 @@ def decode_priority_value(byte_iter): byte = byte_iter.preview() if byte in priorities: - byte = byte_iter.next() + byte = next(byte_iter) return priorities[byte] byte_iter.reset_preview() @@ -498,7 +498,7 @@ def decode_sender_visibility_value(byte_iter): raise wsp_pdu.DecodeError('Error parsing sender visibility ' 'value for byte: %s' % hex(byte)) - byte = byte_iter.next() + byte = next(byte_iter) value = 'Hide' if byte == 128 else 'Show' return value @@ -529,7 +529,7 @@ def decode_response_status_value(byte_iter): 0x88: 'Error-unsupported-message', } byte = byte_iter.preview() - byte_iter.next() + next(byte_iter) # Return error unspecified if it couldn't be decoded return response_status_values.get(byte, 0x81) @@ -555,7 +555,7 @@ def decode_status_value(byte_iter): 0x84: 'Unrecognised', } - byte = byte_iter.next() + byte = next(byte_iter) # Return an unrecognised state if it couldn't be decoded return status_values.get(byte, 0x84) @@ -576,7 +576,7 @@ def decode_expiry_value(byte_iter): :rtype: str or int """ value_length = MMSDecoder.decode_value_length(byte_iter) - token = byte_iter.next() + token = next(byte_iter) if token == 0x80: # Absolute-token return MMSDecoder.decode_date_value(byte_iter) @@ -753,8 +753,7 @@ def encode_message_body(self): for page in self._mms_message._pages: num_entries += page.number_of_parts() - for data_part in self._mms_message._data_parts: - num_entries += 1 + num_entries += len(self._mms_message._data_parts) message_body.extend(self.encode_uint_var(num_entries)) @@ -844,11 +843,11 @@ def encode_header(header_field_name, header_value): ret = getattr(MMSEncoder, 'encode_%s' % expected_type)(header_value) encoded_header.extend(ret) - except wsp_pdu.EncodeError, msg: + except wsp_pdu.EncodeError as e: raise wsp_pdu.EncodeError('Error encoding parameter ' - 'value: %s' % msg) + 'value: %s' % e) except: - debug('A fatal error occurred, probably due to an ' + logging.error('A fatal error occurred, probably due to an ' 'unimplemented encoding operation') raise diff --git a/messaging/mms/wsp_pdu.py b/messaging/mms/wsp_pdu.py index 7aa1eae..3100efe 100644 --- a/messaging/mms/wsp_pdu.py +++ b/messaging/mms/wsp_pdu.py @@ -44,11 +44,11 @@ import array from datetime import datetime +import logging -from messaging.utils import debug from messaging.mms.iterator import PreviewIterator -wsp_pdu_types = { +WSP_PDU_TYPES = { 0x01: 'Connect', 0x02: 'ConnectReply', 0x03: 'Redirect', @@ -63,7 +63,7 @@ } # Well-known parameter assignments ([5], table 38) -well_known_parameters = { +WELL_KNOWN_PARAMETERS = { 0x00: ('Q', 'q_value'), 0x01: ('Charset', 'well_known_charset'), 0x02: ('Level', 'version_value'), @@ -97,7 +97,7 @@ # Content type assignments ([5], table 40) -well_known_content_types = [ +WELL_KNOWN_CONTENT_TYPES = [ '*/*', 'text/*', 'text/html', 'text/plain', 'text/x-hdml', 'text/x-ttml', 'text/x-vCalendar', 'text/x-vCard', 'text/vnd.wap.wml', @@ -164,7 +164,7 @@ # Note that the assigned number is the same as the IANA MIBEnum value # "gsm-default-alphabet" is not included, as it is not assigned any # value in [5]. Also note, this is by no means a complete list -well_known_charsets = { +WELL_KNOWN_CHARSETS = { 0x07EA: 'big5', 0x03E8: 'iso-10646-ucs-2', 0x04: 'iso-8859-1', @@ -182,7 +182,7 @@ } # Header Field Name assignments ([5], table 39) -header_field_names = [ +HEADER_FIELD_NAMES = [ 'Accept', 'Accept-Charset', 'Accept-Encoding', 'Accept-Language', 'Accept-Ranges', 'Age', 'Allow', 'Authorization', 'Cache-Control', @@ -214,11 +214,11 @@ ] -# TODO: combine this dict with the header_field_names table (same as well +# TODO: combine this dict with the HEADER_FIELD_NAMES table (same as well # known parameter assignments) # Temporary fix to allow different types of header field values to be # dynamically decoded -header_field_encodings = {'Accept': 'accept_value', 'Pragma': 'pragma_value'} +HEADER_FIELD_ENCODINGS = {'Accept': 'accept_value', 'Pragma': 'pragma_value'} def get_header_field_names(version='1.2'): @@ -243,7 +243,7 @@ def get_header_field_names(version='1.2'): version = int(version.split('.')[1]) - versioned_field_names = header_field_names[:] + versioned_field_names = HEADER_FIELD_NAMES[:] if version == 3: versioned_field_names = versioned_field_names[:0x44] elif version == 2: @@ -281,7 +281,7 @@ def get_well_known_parameters(version='1.2'): else: version = int(version.split('.')[1]) - versioned_params = well_known_parameters.copy() + versioned_params = WELL_KNOWN_PARAMETERS.copy() if version <= 3: for assigned_number in range(0x11, 0x1e): del versioned_params[assigned_number] @@ -331,7 +331,7 @@ def decode_uint_8(byte_iter): :rtype: int """ # Make the byte unsigned - return byte_iter.next() & 0xf + return next(byte_iter) & 0xf @staticmethod def decode_uint_var(byte_iter): @@ -353,11 +353,11 @@ def decode_uint_var(byte_iter): :rtype: int """ uint = 0 - byte = byte_iter.next() + byte = next(byte_iter) while (byte >> 7) == 0x01: uint = uint << 7 uint |= byte & 0x7f - byte = byte_iter.next() + byte = next(byte_iter) uint = uint << 7 uint |= byte & 0x7f @@ -388,7 +388,7 @@ def decode_short_integer(byte_iter): byte_iter.reset_preview() raise DecodeError('Not a valid short-integer: MSB not set') - byte = byte_iter.next() + byte = next(byte_iter) return byte & 0x7f @staticmethod @@ -447,14 +447,14 @@ def decode_long_integer(byte_iter): longInt = 0 # Decode the Multi-octect-integer - for i in xrange(shortLength): + for i in range(shortLength): longInt = longInt << 8 - longInt |= byte_iter.next() + longInt |= next(byte_iter) return longInt @staticmethod - def decode_text_string(byte_iter): + def decode_text_string(byte_iter, encoding = 'utf-8'): """ Decodes the null-terminated, binary-encoded string value starting at the byte pointed to by ``byte_iter``. @@ -473,17 +473,22 @@ def decode_text_string(byte_iter): :return: The decoded text string :rtype: str """ - decoded_string = '' - byte = byte_iter.next() + b_decoded_string = b'' + byte = next(byte_iter) # Remove Quote character (octet 127), if present if byte == 127: - byte = byte_iter.next() + byte = next(byte_iter) while byte != 0x00: - decoded_string += chr(byte) - byte = byte_iter.next() + b_decoded_string += bytes([byte]) + byte = next(byte_iter) - return decoded_string + try: + # Lets try to decode it to the given encoding + # if that fails we probably have characters that need to be escaped + return b_decoded_string.decode(encoding) + except UnicodeError: + return b_decoded_string.decode("unicode_escape") @staticmethod def decode_quoted_string(byte_iter): @@ -505,7 +510,7 @@ def decode_quoted_string(byte_iter): raise DecodeError('Invalid quoted string: must ' 'start with ') - byte_iter.next() + next(byte_iter) # CHECK: should the quotation chars be pre- and appended before # returning *technically* we should not check for quote characters. return Decoder.decode_text_string(byte_iter) @@ -528,10 +533,10 @@ def decode_token_text(byte_iter): byte_iter.reset_preview() raise DecodeError('Invalid token') - byte = byte_iter.next() + byte = next(byte_iter) while byte > 31 and byte not in separators: token += chr(byte) - byte = byte_iter.next() + byte = next(byte_iter) return token @@ -561,10 +566,10 @@ def decode_extension_media(byte_iter): raise DecodeError('Invalid Extension-media: TEXT ' 'starts with invalid character: %d' % byte) - byte = byte_iter.next() + byte = next(byte_iter) while byte != 0x00: media_value += chr(byte) - byte = byte_iter.next() + byte = next(byte_iter) return media_value @@ -614,7 +619,7 @@ def decode_short_length(byte_iter): raise DecodeError('Not a valid short-length: ' 'should be in octet range 0-30') - return byte_iter.next() + return next(byte_iter) @staticmethod def decode_value_length(byte_iter): @@ -646,7 +651,7 @@ def decode_value_length(byte_iter): byte = byte_iter.preview() # CHECK: this strictness MAY cause issues, but it is correct if byte == 31: - byte_iter.next() # skip past the length-quote + next(byte_iter) # skip past the length-quote length_value = Decoder.decode_uint_var(byte_iter) else: byte_iter.reset_preview() @@ -759,7 +764,7 @@ def decode_well_known_media(byte_iter): 'integer value representing it') try: - return well_known_content_types[value] + return WELL_KNOWN_CONTENT_TYPES[value] except IndexError: raise DecodeError('Invalid well-known media: could not ' 'find content type in table of assigned values') @@ -804,12 +809,12 @@ def decode_constrained_media(byte_iter): """ try: media_value = Decoder.decode_constrained_encoding(byte_iter) - except DecodeError, msg: - raise DecodeError('Invalid Constrained-media: %s' % msg) + except DecodeError as e: + raise DecodeError('Invalid Constrained-media: %s' % e) if isinstance(media_value, int): try: - return well_known_content_types[media_value] + return WELL_KNOWN_CONTENT_TYPES[media_value] except IndexError: raise DecodeError('Invalid constrained media: could not ' 'find well-known content type') @@ -843,8 +848,8 @@ def decode_content_general_form(byte_iter): # Read parameters, etc, until is reached ct_field_bytes = array.array('B') - for i in xrange(value_length): - ct_field_bytes.append(byte_iter.next()) + for i in range(value_length): + ct_field_bytes.append(next(byte_iter)) ct_iter = PreviewIterator(ct_field_bytes) # Now, decode all the bytes read @@ -898,10 +903,10 @@ def decode_typed_parameter(byte_iter): typed_value = '' try: typed_value = getattr(Decoder, 'decode_%s' % value_type)(byte_iter) - except DecodeError, msg: - raise DecodeError('Could not decode Typed-parameter: %s' % msg) + except DecodeError as e: + raise DecodeError('Could not decode Typed-parameter: %s' % e) except: - debug('A fatal error occurred, probably due to an ' + logging.error('A fatal error occurred, probably due to an ' 'unimplemented decoding operation') raise @@ -1186,11 +1191,11 @@ def decode_no_value(byte_iter): :return: No-value, which is 0x00 :rtype: int """ - byte_iter, local_iter = byte_iter.next() - if local_iter.next() != 0x00: + byte_iter, local_iter = next(byte_iter) + if next(local_iter) != 0x00: raise DecodeError('Expected No-value') - byte_iter.next() + next(byte_iter) return 0x00 @staticmethod @@ -1226,7 +1231,7 @@ def decode_accept_value(byte_iter): # Check for the Q-Token (to see if there are Accept-parameters) if byte_iter.preview() == 128: - byte_iter.next() + next(byte_iter) q_value = Decoder.decode_q_value(byte_iter) try: accept_extension = Decoder.decode_parameter(byte_iter) @@ -1260,7 +1265,7 @@ def decode_pragma_value(byte_iter): """ byte = byte_iter.preview() if byte == 0x80: # No-cache - byte_iter.next() + next(byte_iter) # TODO: Not sure if this parameter name (or even usage) is correct name, value = 'Cache-control', 'No-cache' else: @@ -1286,12 +1291,12 @@ def decode_well_known_charset(byte_iter): byte = byte_iter.preview() byte_iter.reset_preview() if byte == 127: - byte_iter.next() + next(byte_iter) decoded_charset = '*' else: charset_value = Decoder.decode_integer_value(byte_iter) - if charset_value in well_known_charsets: - decoded_charset = well_known_charsets[charset_value] + if charset_value in WELL_KNOWN_CHARSETS: + decoded_charset = WELL_KNOWN_CHARSETS[charset_value] else: # This charset is not in our table... so just use the # value (at least for now) @@ -1319,7 +1324,7 @@ def decode_well_known_header(byte_iter): hdr_fields = get_header_field_names() # TODO: *technically* this can fail, but then we have already # read a byte... should fix? - if field_value not in xrange(len(hdr_fields)): + if field_value not in list(range(len(hdr_fields))): raise DecodeError('Invalid Header Field value: %d' % field_value) field_name = hdr_fields[field_value] @@ -1328,15 +1333,15 @@ def decode_well_known_header(byte_iter): # decode_application_header also # Currently we decode most headers as text_strings, except # where we have a specific decoding algorithm implemented - if field_name in header_field_encodings: - wap_value_type = header_field_encodings[field_name] + if field_name in HEADER_FIELD_ENCODINGS: + wap_value_type = HEADER_FIELD_ENCODINGS[field_name] try: decoded_value = getattr(Decoder, 'decode_%s' % wap_value_type)(byte_iter) - except DecodeError, msg: - raise DecodeError('Could not decode Wap-value: %s' % msg) + except DecodeError as e: + raise DecodeError('Could not decode Wap-value: %s' % e) except: - debug('An error occurred, probably due to an ' + logging.error('An error occurred, probably due to an ' 'unimplemented decoding operation. Tried to ' 'decode header: %s' % field_name) raise @@ -1372,6 +1377,7 @@ def decode_application_header(byte_iter): app_header = Decoder.decode_text_string(byte_iter) app_specific_value = Decoder.decode_text_string(byte_iter) + return app_header, app_specific_value @staticmethod @@ -1606,10 +1612,10 @@ def encode_media_type(content_type): values :rtype: list """ - if content_type in well_known_content_types: + if content_type in WELL_KNOWN_CONTENT_TYPES: # Short-integer encoding val = Encoder.encode_short_integer( - well_known_content_types.index(content_type)) + WELL_KNOWN_CONTENT_TYPES.index(content_type)) else: val = Encoder.encode_text_string(content_type) @@ -1664,10 +1670,10 @@ def encode_parameter(parameter_name, parameter_value, version='1.2'): ret = getattr(Encoder, 'encode_%s' % expected_type)(parameter_value) encoded_parameter.extend(ret) - except EncodeError, msg: - raise EncodeError('Error encoding param value: %s' % msg) + except EncodeError as e: + raise EncodeError('Error encoding param value: %s' % e) except: - debug('A fatal error occurred, probably due to an ' + logging.error('A fatal error occurred, probably due to an ' 'unimplemented encoding operation') raise break @@ -1794,15 +1800,15 @@ def encode_header(field_name, value): # TODO: make this flow better (see also Decoder.decode_header) # most header values are encoded as text_strings, except where we # have a specific Wap-value encoding implementation - if field_name in header_field_encodings: - wap_value_type = header_field_encodings[field_name] + if field_name in HEADER_FIELD_ENCODINGS: + wap_value_type = HEADER_FIELD_ENCODINGS[field_name] try: ret = getattr(Encoder, 'encode_%s' % wap_value_type)(value) encoded_header.extend(ret) - except EncodeError, msg: - raise EncodeError('Error encoding Wap-value: %s' % msg) + except EncodeError as e: + raise EncodeError('Error encoding Wap-value: %s' % e) except: - debug('A fatal error occurred, probably due to an ' + logging.error('A fatal error occurred, probably due to an ' 'unimplemented encoding operation') raise else: @@ -1858,8 +1864,8 @@ def encode_constrained_media(media_type): :rtype: list """ # See if this value is in the table of well-known content types - if media_type in well_known_content_types: - value = well_known_content_types.index(media_type) + if media_type in WELL_KNOWN_CONTENT_TYPES: + value = WELL_KNOWN_CONTENT_TYPES.index(media_type) else: value = media_type @@ -1920,7 +1926,7 @@ def encode_extension_media(media_value): :return: The encoded media type value, as a sequence of bytes :rtype: str """ - if not isinstance(media_value, basestring): + if not isinstance(media_value, str): try: media_value = str(media_value) except: @@ -2045,8 +2051,8 @@ def encode_accept_value(accept_value): # ...now try Accept-general-form try: encoded_media_range = Encoder.encode_media_type(accept_value) - except EncodeError, msg: - raise EncodeError('Cannot encode Accept-value: %s' % msg) + except EncodeError as e: + raise EncodeError('Cannot encode Accept-value: %s' % e) value_length = Encoder.encode_value_length(len(encoded_media_range)) encoded_accept_value = value_length diff --git a/messaging/sms/__init__.py b/messaging/sms/__init__.py index f00ad10..d619a85 100644 --- a/messaging/sms/__init__.py +++ b/messaging/sms/__init__.py @@ -2,6 +2,6 @@ from messaging.sms.submit import SmsSubmit from messaging.sms.deliver import SmsDeliver -from messaging.sms.gsm0338 import is_gsm_text +from messaging.sms.gsm0338 import is_valid_gsm -__all__ = ["SmsSubmit", "SmsDeliver", "is_gsm_text"] +__all__ = ["SmsSubmit", "SmsDeliver", "is_valid_gsm"] diff --git a/messaging/sms/base.py b/messaging/sms/base.py index b7b1d59..ba09cf6 100644 --- a/messaging/sms/base.py +++ b/messaging/sms/base.py @@ -1,7 +1,7 @@ # see LICENSE -class SmsBase(object): +class SmsBase: def __init__(self): self.udh = None diff --git a/messaging/sms/deliver.py b/messaging/sms/deliver.py index 5a9ba6b..b59529a 100644 --- a/messaging/sms/deliver.py +++ b/messaging/sms/deliver.py @@ -2,9 +2,10 @@ """Classes for processing received SMS""" from datetime import datetime, timedelta +import logging -from messaging.utils import (swap, swap_number, encode_bytes, debug, - unpack_msg, unpack_msg2, to_array) +from messaging.utils import (swap, swap_number, encode_bytes, + unpack_msg, hex_to_int_array) from messaging.sms import consts from messaging.sms.base import SmsBase from messaging.sms.udh import UserDataHeader @@ -65,7 +66,7 @@ def _set_pdu(self, pdu): # XXX: Should we keep the original PDU or the modified one? self._pdu = pdu - data = to_array(self._pdu) + data = hex_to_int_array(self._pdu) # Service centre address smscl = data.pop(0) @@ -106,7 +107,7 @@ def _set_pdu(self, pdu): sndtype = (data.pop(0) >> 4) & 0x07 if sndtype == consts.ALPHANUMERIC: # coded according to 3GPP TS 23.038 [9] GSM 7-bit default alphabet - sender = unpack_msg2(data[:sndlen]).decode("gsm0338") + sender = unpack_msg(data[:sndlen]).decode("gsm0338") else: # Extract phone number of sender sender = swap_number(encode_bytes(data[:sndlen])) @@ -172,19 +173,16 @@ def _process_message(self, data): headlen = int(headlen) if self.fmt == 0x00: - # XXX: Use unpack_msg2 - data = data[ud_len:].tolist() - #self.text = unpack_msg2(data).decode("gsm0338") self.text = unpack_msg(msg)[headlen:msgl].decode("gsm0338") elif self.fmt == 0x04: - self.text = data[ud_len:].tostring() + self.text = data[ud_len:].tobytes() elif self.fmt == 0x08: data = data[ud_len:].tolist() _bytes = [int("%02X%02X" % (data[i], data[i + 1]), 16) for i in range(0, len(data), 2)] - self.text = u''.join(list(map(unichr, _bytes))) + self.text = ''.join(list(map(chr, _bytes))) pdu = property(lambda self: self._pdu, _set_pdu) @@ -209,7 +207,7 @@ def _decode_status_report_pdu(self, data): self.date = datetime.strptime(scts_str, "%y/%m/%d %H:%M:%S") except (ValueError, TypeError): scts_str = '' - debug('Could not decode scts: %s' % date) + logging.debug('Could not decode scts: %s' % date) data = data[7:] @@ -220,7 +218,7 @@ def _decode_status_report_pdu(self, data): except (ValueError, TypeError): dt_str = '' dt = None - debug('Could not decode date: %s' % date) + logging.debug('Could not decode date: %s' % date) data = data[7:] diff --git a/messaging/sms/gsm0338.py b/messaging/sms/gsm0338.py index 32a50b5..dc1243f 100644 --- a/messaging/sms/gsm0338.py +++ b/messaging/sms/gsm0338.py @@ -12,281 +12,286 @@ # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# Refactored using cleaner code from +# https://github.com/jezeniel/smsutil/blob/master/smsutil/codecs.py import codecs -import sys -import traceback - -# data from -# http://snoops.roy202.org/testerman/browser/trunk/plugins/codecs/gsm0338.py +from array import array +import re # default GSM 03.38 -> unicode -def_regular_decode_dict = { - '\x00': u'\u0040', # COMMERCIAL AT - '\x01': u'\u00A3', # POUND SIGN - '\x02': u'\u0024', # DOLLAR SIGN - '\x03': u'\u00A5', # YEN SIGN - '\x04': u'\u00E8', # LATIN SMALL LETTER E WITH GRAVE - '\x05': u'\u00E9', # LATIN SMALL LETTER E WITH ACUTE - '\x06': u'\u00F9', # LATIN SMALL LETTER U WITH GRAVE - '\x07': u'\u00EC', # LATIN SMALL LETTER I WITH GRAVE - '\x08': u'\u00F2', # LATIN SMALL LETTER O WITH GRAVE - '\x09': u'\u00C7', # LATIN CAPITAL LETTER C WITH CEDILLA +GSM_BASIC_CHARSET = { + '\x00': '\u0040', # COMMERCIAL AT + '\x01': '\u00A3', # POUND SIGN + '\x02': '\u0024', # DOLLAR SIGN + '\x03': '\u00A5', # YEN SIGN + '\x04': '\u00E8', # LATIN SMALL LETTER E WITH GRAVE + '\x05': '\u00E9', # LATIN SMALL LETTER E WITH ACUTE + '\x06': '\u00F9', # LATIN SMALL LETTER U WITH GRAVE + '\x07': '\u00EC', # LATIN SMALL LETTER I WITH GRAVE + '\x08': '\u00F2', # LATIN SMALL LETTER O WITH GRAVE + '\x09': '\u00C7', # LATIN CAPITAL LETTER C WITH CEDILLA # The Unicode page suggests this is a mistake: but # it's still in the latest version of the spec and # our implementation has to be exact. - '\x0A': u'\u000A', # LINE FEED - '\x0B': u'\u00D8', # LATIN CAPITAL LETTER O WITH STROKE - '\x0C': u'\u00F8', # LATIN SMALL LETTER O WITH STROKE - '\x0D': u'\u000D', # CARRIAGE RETURN - '\x0E': u'\u00C5', # LATIN CAPITAL LETTER A WITH RING ABOVE - '\x0F': u'\u00E5', # LATIN SMALL LETTER A WITH RING ABOVE - '\x10': u'\u0394', # GREEK CAPITAL LETTER DELTA - '\x11': u'\u005F', # LOW LINE - '\x12': u'\u03A6', # GREEK CAPITAL LETTER PHI - '\x13': u'\u0393', # GREEK CAPITAL LETTER GAMMA - '\x14': u'\u039B', # GREEK CAPITAL LETTER LAMDA - '\x15': u'\u03A9', # GREEK CAPITAL LETTER OMEGA - '\x16': u'\u03A0', # GREEK CAPITAL LETTER PI - '\x17': u'\u03A8', # GREEK CAPITAL LETTER PSI - '\x18': u'\u03A3', # GREEK CAPITAL LETTER SIGMA - '\x19': u'\u0398', # GREEK CAPITAL LETTER THETA - '\x1A': u'\u039E', # GREEK CAPITAL LETTER XI - '\x1C': u'\u00C6', # LATIN CAPITAL LETTER AE - '\x1D': u'\u00E6', # LATIN SMALL LETTER AE - '\x1E': u'\u00DF', # LATIN SMALL LETTER SHARP S (German) - '\x1F': u'\u00C9', # LATIN CAPITAL LETTER E WITH ACUTE - '\x20': u'\u0020', # SPACE - '\x21': u'\u0021', # EXCLAMATION MARK - '\x22': u'\u0022', # QUOTATION MARK - '\x23': u'\u0023', # NUMBER SIGN - '\x24': u'\u00A4', # CURRENCY SIGN - '\x25': u'\u0025', # PERCENT SIGN - '\x26': u'\u0026', # AMPERSAND - '\x27': u'\u0027', # APOSTROPHE - '\x28': u'\u0028', # LEFT PARENTHESIS - '\x29': u'\u0029', # RIGHT PARENTHESIS - '\x2A': u'\u002A', # ASTERISK - '\x2B': u'\u002B', # PLUS SIGN - '\x2C': u'\u002C', # COMMA - '\x2D': u'\u002D', # HYPHEN-MINUS - '\x2E': u'\u002E', # FULL STOP - '\x2F': u'\u002F', # SOLIDUS - '\x30': u'\u0030', # DIGIT ZERO - '\x31': u'\u0031', # DIGIT ONE - '\x32': u'\u0032', # DIGIT TWO - '\x33': u'\u0033', # DIGIT THREE - '\x34': u'\u0034', # DIGIT FOUR - '\x35': u'\u0035', # DIGIT FIVE - '\x36': u'\u0036', # DIGIT SIX - '\x37': u'\u0037', # DIGIT SEVEN - '\x38': u'\u0038', # DIGIT EIGHT - '\x39': u'\u0039', # DIGIT NINE - '\x3A': u'\u003A', # COLON - '\x3B': u'\u003B', # SEMICOLON - '\x3C': u'\u003C', # LESS-THAN SIGN - '\x3D': u'\u003D', # EQUALS SIGN - '\x3E': u'\u003E', # GREATER-THAN SIGN - '\x3F': u'\u003F', # QUESTION MARK - '\x40': u'\u00A1', # INVERTED EXCLAMATION MARK - '\x41': u'\u0041', # LATIN CAPITAL LETTER A - '\x42': u'\u0042', # LATIN CAPITAL LETTER B - '\x43': u'\u0043', # LATIN CAPITAL LETTER C - '\x44': u'\u0044', # LATIN CAPITAL LETTER D - '\x45': u'\u0045', # LATIN CAPITAL LETTER E - '\x46': u'\u0046', # LATIN CAPITAL LETTER F - '\x47': u'\u0047', # LATIN CAPITAL LETTER G - '\x48': u'\u0048', # LATIN CAPITAL LETTER H - '\x49': u'\u0049', # LATIN CAPITAL LETTER I - '\x4A': u'\u004A', # LATIN CAPITAL LETTER J - '\x4B': u'\u004B', # LATIN CAPITAL LETTER K - '\x4C': u'\u004C', # LATIN CAPITAL LETTER L - '\x4D': u'\u004D', # LATIN CAPITAL LETTER M - '\x4E': u'\u004E', # LATIN CAPITAL LETTER N - '\x4F': u'\u004F', # LATIN CAPITAL LETTER O - '\x50': u'\u0050', # LATIN CAPITAL LETTER P - '\x51': u'\u0051', # LATIN CAPITAL LETTER Q - '\x52': u'\u0052', # LATIN CAPITAL LETTER R - '\x53': u'\u0053', # LATIN CAPITAL LETTER S - '\x54': u'\u0054', # LATIN CAPITAL LETTER T - '\x55': u'\u0055', # LATIN CAPITAL LETTER U - '\x56': u'\u0056', # LATIN CAPITAL LETTER V - '\x57': u'\u0057', # LATIN CAPITAL LETTER W - '\x58': u'\u0058', # LATIN CAPITAL LETTER X - '\x59': u'\u0059', # LATIN CAPITAL LETTER Y - '\x5A': u'\u005A', # LATIN CAPITAL LETTER Z - '\x5B': u'\u00C4', # LATIN CAPITAL LETTER A WITH DIAERESIS - '\x5C': u'\u00D6', # LATIN CAPITAL LETTER O WITH DIAERESIS - '\x5D': u'\u00D1', # LATIN CAPITAL LETTER N WITH TILDE - '\x5E': u'\u00DC', # LATIN CAPITAL LETTER U WITH DIAERESIS - '\x5F': u'\u00A7', # SECTION SIGN - '\x60': u'\u00BF', # INVERTED QUESTION MARK - '\x61': u'\u0061', # LATIN SMALL LETTER A - '\x62': u'\u0062', # LATIN SMALL LETTER B - '\x63': u'\u0063', # LATIN SMALL LETTER C - '\x64': u'\u0064', # LATIN SMALL LETTER D - '\x65': u'\u0065', # LATIN SMALL LETTER E - '\x66': u'\u0066', # LATIN SMALL LETTER F - '\x67': u'\u0067', # LATIN SMALL LETTER G - '\x68': u'\u0068', # LATIN SMALL LETTER H - '\x69': u'\u0069', # LATIN SMALL LETTER I - '\x6A': u'\u006A', # LATIN SMALL LETTER J - '\x6B': u'\u006B', # LATIN SMALL LETTER K - '\x6C': u'\u006C', # LATIN SMALL LETTER L - '\x6D': u'\u006D', # LATIN SMALL LETTER M - '\x6E': u'\u006E', # LATIN SMALL LETTER N - '\x6F': u'\u006F', # LATIN SMALL LETTER O - '\x70': u'\u0070', # LATIN SMALL LETTER P - '\x71': u'\u0071', # LATIN SMALL LETTER Q - '\x72': u'\u0072', # LATIN SMALL LETTER R - '\x73': u'\u0073', # LATIN SMALL LETTER S - '\x74': u'\u0074', # LATIN SMALL LETTER T - '\x75': u'\u0075', # LATIN SMALL LETTER U - '\x76': u'\u0076', # LATIN SMALL LETTER V - '\x77': u'\u0077', # LATIN SMALL LETTER W - '\x78': u'\u0078', # LATIN SMALL LETTER X - '\x79': u'\u0079', # LATIN SMALL LETTER Y - '\x7A': u'\u007A', # LATIN SMALL LETTER Z - '\x7B': u'\u00E4', # LATIN SMALL LETTER A WITH DIAERESIS - '\x7C': u'\u00F6', # LATIN SMALL LETTER O WITH DIAERESIS - '\x7D': u'\u00F1', # LATIN SMALL LETTER N WITH TILDE - '\x7E': u'\u00FC', # LATIN SMALL LETTER U WITH DIAERESIS - '\x7F': u'\u00E0', # LATIN SMALL LETTER A WITH GRAVE + '\x0A': '\u000A', # LINE FEED + '\x0B': '\u00D8', # LATIN CAPITAL LETTER O WITH STROKE + '\x0C': '\u00F8', # LATIN SMALL LETTER O WITH STROKE + '\x0D': '\u000D', # CARRIAGE RETURN + '\x0E': '\u00C5', # LATIN CAPITAL LETTER A WITH RING ABOVE + '\x0F': '\u00E5', # LATIN SMALL LETTER A WITH RING ABOVE + '\x10': '\u0394', # GREEK CAPITAL LETTER DELTA + '\x11': '\u005F', # LOW LINE + '\x12': '\u03A6', # GREEK CAPITAL LETTER PHI + '\x13': '\u0393', # GREEK CAPITAL LETTER GAMMA + '\x14': '\u039B', # GREEK CAPITAL LETTER LAMDA + '\x15': '\u03A9', # GREEK CAPITAL LETTER OMEGA + '\x16': '\u03A0', # GREEK CAPITAL LETTER PI + '\x17': '\u03A8', # GREEK CAPITAL LETTER PSI + '\x18': '\u03A3', # GREEK CAPITAL LETTER SIGMA + '\x19': '\u0398', # GREEK CAPITAL LETTER THETA + '\x1A': '\u039E', # GREEK CAPITAL LETTER XI + '\x1C': '\u00C6', # LATIN CAPITAL LETTER AE + '\x1D': '\u00E6', # LATIN SMALL LETTER AE + '\x1E': '\u00DF', # LATIN SMALL LETTER SHARP S (German) + '\x1F': '\u00C9', # LATIN CAPITAL LETTER E WITH ACUTE + '\x20': '\u0020', # SPACE + '\x21': '\u0021', # EXCLAMATION MARK + '\x22': '\u0022', # QUOTATION MARK + '\x23': '\u0023', # NUMBER SIGN + '\x24': '\u00A4', # CURRENCY SIGN + '\x25': '\u0025', # PERCENT SIGN + '\x26': '\u0026', # AMPERSAND + '\x27': '\u0027', # APOSTROPHE + '\x28': '\u0028', # LEFT PARENTHESIS + '\x29': '\u0029', # RIGHT PARENTHESIS + '\x2A': '\u002A', # ASTERISK + '\x2B': '\u002B', # PLUS SIGN + '\x2C': '\u002C', # COMMA + '\x2D': '\u002D', # HYPHEN-MINUS + '\x2E': '\u002E', # FULL STOP + '\x2F': '\u002F', # SOLIDUS + '\x30': '\u0030', # DIGIT ZERO + '\x31': '\u0031', # DIGIT ONE + '\x32': '\u0032', # DIGIT TWO + '\x33': '\u0033', # DIGIT THREE + '\x34': '\u0034', # DIGIT FOUR + '\x35': '\u0035', # DIGIT FIVE + '\x36': '\u0036', # DIGIT SIX + '\x37': '\u0037', # DIGIT SEVEN + '\x38': '\u0038', # DIGIT EIGHT + '\x39': '\u0039', # DIGIT NINE + '\x3A': '\u003A', # COLON + '\x3B': '\u003B', # SEMICOLON + '\x3C': '\u003C', # LESS-THAN SIGN + '\x3D': '\u003D', # EQUALS SIGN + '\x3E': '\u003E', # GREATER-THAN SIGN + '\x3F': '\u003F', # QUESTION MARK + '\x40': '\u00A1', # INVERTED EXCLAMATION MARK + '\x41': '\u0041', # LATIN CAPITAL LETTER A + '\x42': '\u0042', # LATIN CAPITAL LETTER B + '\x43': '\u0043', # LATIN CAPITAL LETTER C + '\x44': '\u0044', # LATIN CAPITAL LETTER D + '\x45': '\u0045', # LATIN CAPITAL LETTER E + '\x46': '\u0046', # LATIN CAPITAL LETTER F + '\x47': '\u0047', # LATIN CAPITAL LETTER G + '\x48': '\u0048', # LATIN CAPITAL LETTER H + '\x49': '\u0049', # LATIN CAPITAL LETTER I + '\x4A': '\u004A', # LATIN CAPITAL LETTER J + '\x4B': '\u004B', # LATIN CAPITAL LETTER K + '\x4C': '\u004C', # LATIN CAPITAL LETTER L + '\x4D': '\u004D', # LATIN CAPITAL LETTER M + '\x4E': '\u004E', # LATIN CAPITAL LETTER N + '\x4F': '\u004F', # LATIN CAPITAL LETTER O + '\x50': '\u0050', # LATIN CAPITAL LETTER P + '\x51': '\u0051', # LATIN CAPITAL LETTER Q + '\x52': '\u0052', # LATIN CAPITAL LETTER R + '\x53': '\u0053', # LATIN CAPITAL LETTER S + '\x54': '\u0054', # LATIN CAPITAL LETTER T + '\x55': '\u0055', # LATIN CAPITAL LETTER U + '\x56': '\u0056', # LATIN CAPITAL LETTER V + '\x57': '\u0057', # LATIN CAPITAL LETTER W + '\x58': '\u0058', # LATIN CAPITAL LETTER X + '\x59': '\u0059', # LATIN CAPITAL LETTER Y + '\x5A': '\u005A', # LATIN CAPITAL LETTER Z + '\x5B': '\u00C4', # LATIN CAPITAL LETTER A WITH DIAERESIS + '\x5C': '\u00D6', # LATIN CAPITAL LETTER O WITH DIAERESIS + '\x5D': '\u00D1', # LATIN CAPITAL LETTER N WITH TILDE + '\x5E': '\u00DC', # LATIN CAPITAL LETTER U WITH DIAERESIS + '\x5F': '\u00A7', # SECTION SIGN + '\x60': '\u00BF', # INVERTED QUESTION MARK + '\x61': '\u0061', # LATIN SMALL LETTER A + '\x62': '\u0062', # LATIN SMALL LETTER B + '\x63': '\u0063', # LATIN SMALL LETTER C + '\x64': '\u0064', # LATIN SMALL LETTER D + '\x65': '\u0065', # LATIN SMALL LETTER E + '\x66': '\u0066', # LATIN SMALL LETTER F + '\x67': '\u0067', # LATIN SMALL LETTER G + '\x68': '\u0068', # LATIN SMALL LETTER H + '\x69': '\u0069', # LATIN SMALL LETTER I + '\x6A': '\u006A', # LATIN SMALL LETTER J + '\x6B': '\u006B', # LATIN SMALL LETTER K + '\x6C': '\u006C', # LATIN SMALL LETTER L + '\x6D': '\u006D', # LATIN SMALL LETTER M + '\x6E': '\u006E', # LATIN SMALL LETTER N + '\x6F': '\u006F', # LATIN SMALL LETTER O + '\x70': '\u0070', # LATIN SMALL LETTER P + '\x71': '\u0071', # LATIN SMALL LETTER Q + '\x72': '\u0072', # LATIN SMALL LETTER R + '\x73': '\u0073', # LATIN SMALL LETTER S + '\x74': '\u0074', # LATIN SMALL LETTER T + '\x75': '\u0075', # LATIN SMALL LETTER U + '\x76': '\u0076', # LATIN SMALL LETTER V + '\x77': '\u0077', # LATIN SMALL LETTER W + '\x78': '\u0078', # LATIN SMALL LETTER X + '\x79': '\u0079', # LATIN SMALL LETTER Y + '\x7A': '\u007A', # LATIN SMALL LETTER Z + '\x7B': '\u00E4', # LATIN SMALL LETTER A WITH DIAERESIS + '\x7C': '\u00F6', # LATIN SMALL LETTER O WITH DIAERESIS + '\x7D': '\u00F1', # LATIN SMALL LETTER N WITH TILDE + '\x7E': '\u00FC', # LATIN SMALL LETTER U WITH DIAERESIS + '\x7F': '\u00E0', # LATIN SMALL LETTER A WITH GRAVE } # default GSM 03.38 escaped characters -> unicode -def_escape_decode_dict = { - '\x0A': u'\u000C', # FORM FEED - '\x14': u'\u005E', # CIRCUMFLEX ACCENT - '\x28': u'\u007B', # LEFT CURLY BRACKET - '\x29': u'\u007D', # RIGHT CURLY BRACKET - '\x2F': u'\u005C', # REVERSE SOLIDUS - '\x3C': u'\u005B', # LEFT SQUARE BRACKET - '\x3D': u'\u007E', # TILDE - '\x3E': u'\u005D', # RIGHT SQUARE BRACKET - '\x40': u'\u007C', # VERTICAL LINE - '\x65': u'\u20AC', # EURO SIGN +GSM_EXT_CHARSET = { + '\x1B\x0A': '\u000C', # FORM FEED + '\x1B\x14': '\u005E', # CIRCUMFLEX ACCENT + '\x1B\x28': '\u007B', # LEFT CURLY BRACKET + '\x1B\x29': '\u007D', # RIGHT CURLY BRACKET + '\x1B\x2F': '\u005C', # REVERSE SOLIDUS + '\x1B\x3C': '\u005B', # LEFT SQUARE BRACKET + '\x1B\x3D': '\u007E', # TILDE + '\x1B\x3E': '\u005D', # RIGHT SQUARE BRACKET + '\x1B\x40': '\u007C', # VERTICAL LINE + '\x1B\x65': '\u20AC', # EURO SIGN } # Replacement characters, default is question mark. Used when it is not too # important to ensure exact UTF-8 -> GSM -> UTF-8 equivilence, such as when # humans read and write SMS. But for USSD and other M2M applications it's # important to ensure the conversion is exact. -def_replace_encode_dict = { - u'\u00E7': '\x09', # LATIN SMALL LETTER C WITH CEDILLA +GSM_REPLACE_CHARSET = { + '\u00E7': '\x09', # LATIN SMALL LETTER C WITH CEDILLA - u'\u0391': '\x41', # GREEK CAPITAL LETTER ALPHA - u'\u0392': '\x42', # GREEK CAPITAL LETTER BETA - u'\u0395': '\x45', # GREEK CAPITAL LETTER EPSILON - u'\u0397': '\x48', # GREEK CAPITAL LETTER ETA - u'\u0399': '\x49', # GREEK CAPITAL LETTER IOTA - u'\u039A': '\x4B', # GREEK CAPITAL LETTER KAPPA - u'\u039C': '\x4D', # GREEK CAPITAL LETTER MU - u'\u039D': '\x4E', # GREEK CAPITAL LETTER NU - u'\u039F': '\x4F', # GREEK CAPITAL LETTER OMICRON - u'\u03A1': '\x50', # GREEK CAPITAL LETTER RHO - u'\u03A4': '\x54', # GREEK CAPITAL LETTER TAU - u'\u03A7': '\x58', # GREEK CAPITAL LETTER CHI - u'\u03A5': '\x59', # GREEK CAPITAL LETTER UPSILON - u'\u0396': '\x5A', # GREEK CAPITAL LETTER ZETA + '\u0391': '\x41', # GREEK CAPITAL LETTER ALPHA + '\u0392': '\x42', # GREEK CAPITAL LETTER BETA + '\u0395': '\x45', # GREEK CAPITAL LETTER EPSILON + '\u0397': '\x48', # GREEK CAPITAL LETTER ETA + '\u0399': '\x49', # GREEK CAPITAL LETTER IOTA + '\u039A': '\x4B', # GREEK CAPITAL LETTER KAPPA + '\u039C': '\x4D', # GREEK CAPITAL LETTER MU + '\u039D': '\x4E', # GREEK CAPITAL LETTER NU + '\u039F': '\x4F', # GREEK CAPITAL LETTER OMICRON + '\u03A1': '\x50', # GREEK CAPITAL LETTER RHO + '\u03A4': '\x54', # GREEK CAPITAL LETTER TAU + '\u03A7': '\x58', # GREEK CAPITAL LETTER CHI + '\u03A5': '\x59', # GREEK CAPITAL LETTER UPSILON + '\u0396': '\x5A', # GREEK CAPITAL LETTER ZETA } -QUESTION_MARK = chr(0x3f) +GSM_CHARSET = {**GSM_BASIC_CHARSET, **GSM_EXT_CHARSET} + +QUESTION_MARK = ord('\u003F') +ESCAPE = ord('\x1B') +NBSP = ord('\u00A0') -# unicode -> default GSM 03.38 -def_regular_encode_dict = \ - dict((u, g) for g, u in def_regular_decode_dict.iteritems()) +decoding_map = dict((ord(k), ord(v)) if len(k) == 1 else (bytes([ord(k[0]), ord(k[1])]), ord(v)) for k, v in GSM_CHARSET.items()) -# unicode -> default escaped GSM 03.38 characters -def_escape_encode_dict = \ - dict((u, g) for g, u in def_escape_decode_dict.iteritems()) +encoding_map = dict((ord(v), ord(k)) for k, v in GSM_BASIC_CHARSET.items()) +ext_encoding_map = dict((ord(v), ord(k[1])) for k, v in GSM_EXT_CHARSET.items()) -def encode(input_, errors='strict'): - """ - :type input_: unicode +replace_encode_map = dict((ord(k), ord(v)) for k, v in GSM_REPLACE_CHARSET.items()) - :return: string - """ - result = [] - for c in input_: - try: - result.append(def_regular_encode_dict[c]) - except KeyError: - if c in def_escape_encode_dict: - # OK, let's encode it as an escaped characters - result.append('\x1b') - result.append(def_escape_encode_dict[c]) +def encode_gsm0338(text, errors, encoding_map, ext_encoding_map, replace_encode_map): + encoded = b'' + for char in text: + ochar = ord(char) + ec = b'' + if ochar in encoding_map: + ec = encoding_map.get(ochar) + else: + if ochar in ext_encoding_map: + encoded += bytes([ESCAPE]) + ec = ext_encoding_map.get(ochar) + elif errors == 'strict': + raise UnicodeError("Invalid GSM character") + elif errors == 'replace': + ec = replace_encode_map.get(ochar, QUESTION_MARK) + elif errors == 'ignore': + pass else: - if errors == 'strict': - raise UnicodeError("Invalid GSM character") - elif errors == 'replace': - result.append( - def_replace_encode_dict.get(c, QUESTION_MARK)) - elif errors == 'ignore': - pass - else: - raise UnicodeError("Unknown error handling") + raise UnicodeError("Unknown error handling") + if isinstance(ec, int): + ec = bytes([ec]) + encoded += ec + return encoded, len(encoded) - ret = ''.join(result) - return ret, len(ret) +def decode_gsm0338(text, decoding_map): + decoded = '' + skip = None + for index, char in enumerate(bytes(text)): + next_char = index + 1 + if skip == index: + continue + if char != ESCAPE: + d = decoding_map.get(char) + elif char == ESCAPE and next_char < len(text): + ext_char = bytes([ESCAPE, text[next_char]]) + d = decoding_map.get(ext_char, NBSP) + if d != NBSP: + skip = next_char + else: + d = NBSP + decoded += chr(d) + return decoded, len(decoded) -def decode(input_, errors='strict'): - """ - :type input_: str +class GSM0338Codec(codecs.Codec): + def encode(self, input_, errors='strict'): + return encode_gsm0338(input_, errors, encoding_map, ext_encoding_map, replace_encode_map) + + def decode(self, input_, errors='strict'): + return decode_gsm0338(input_, decoding_map) + + +class GSM0338IncrementalEncoder(codecs.IncrementalEncoder): + def encode(self, input_, final=False): + return encode_gsm0338(input_, self.errors, encoding_map, ext_encoding_map, replace_encode_map)[0] + + +class GSM0338IncrementalDecoder(codecs.IncrementalDecoder): + def decode(self, input_, final=False): + return decode_gsm0338(input_, decoding_map)[0] + + +class GSM0338StreamReader(GSM0338Codec, codecs.StreamReader): + pass - :return: unicode - """ - result = [] - index = 0 - while index < len(input_): - c = input_[index] - index += 1 - if c == '\x1b': - if index < len(input_): - c = input_[index] - index += 1 - result.append(def_escape_decode_dict.get(c, u'\xa0')) - else: - result.append(u'\xa0') - else: - try: - result.append(def_regular_decode_dict[c]) - except KeyError: - # error handling: unassigned byte, must be > 0x7f - if errors == 'strict': - raise UnicodeError("Unrecognized GSM character") - elif errors == 'replace': - result.append('?') - elif errors == 'ignore': - pass - else: - raise UnicodeError("Unknown error handling") - ret = u''.join(result) - return ret, len(ret) +class GSM0338StreamWriter(GSM0338Codec, codecs.StreamWriter): + pass -# encodings module API -def getregentry(encoding): - if encoding == 'gsm0338': - return codecs.CodecInfo(name='gsm0338', - encode=encode, - decode=decode) +def search_gsm0338(encoding): + if encoding in ('gsm0338', 'gsm7'): + return codecs.CodecInfo( + name='gsm0338', + encode=GSM0338Codec().encode, + decode=GSM0338Codec().decode, + incrementalencoder=GSM0338IncrementalEncoder, + incrementaldecoder=GSM0338IncrementalDecoder, + streamwriter=GSM0338StreamWriter, + streamreader=GSM0338StreamReader + ) + return None -# Codec registration -codecs.register(getregentry) +def is_valid_gsm(text): + ''' Validate if `text` is a valid gsm 03.338. ''' + r = '^[' + re.escape(''.join(list(GSM_CHARSET.values()))) + ']+$' + return re.match(r, text, re.UNICODE) is not None -def is_gsm_text(text): - """Returns True if ``text`` can be encoded as gsm text""" - try: - text.encode("gsm0338") - except UnicodeError: - return False - except: - traceback.print_exc(file=sys.stdout) - return False - return True +codecs.register(search_gsm0338) \ No newline at end of file diff --git a/messaging/sms/pdu.py b/messaging/sms/pdu.py index 9d680d5..b517bea 100644 --- a/messaging/sms/pdu.py +++ b/messaging/sms/pdu.py @@ -1,7 +1,7 @@ # see LICENSE -class Pdu(object): +class Pdu: def __init__(self, pdu, len_smsc, cnt=1, seq=1): self.pdu = pdu.upper() diff --git a/messaging/sms/submit.py b/messaging/sms/submit.py index e426f13..276ed4f 100644 --- a/messaging/sms/submit.py +++ b/messaging/sms/submit.py @@ -3,18 +3,19 @@ from datetime import datetime, timedelta import re +import logging from messaging.sms import consts -from messaging.utils import (debug, encode_str, clean_number, +from messaging.utils import (encode_str, clean_number, pack_8bits_to_ucs2, pack_8bits_to_7bits, pack_8bits_to_8bit, timedelta_to_relative_validity, datetime_to_absolute_validity) from messaging.sms.base import SmsBase -from messaging.sms.gsm0338 import is_gsm_text +from messaging.sms.gsm0338 import is_valid_gsm from messaging.sms.pdu import Pdu -VALID_NUMBER = re.compile("^\+?\d{3,20}$") +VALID_NUMBER = re.compile(r"^\+?\d{3,20}$") class SmsSubmit(SmsBase): @@ -90,16 +91,16 @@ def to_pdu(self): pdu += sms_phone_pdu pdu += tppid_pdu pdu += sms_msg_pdu[0] - debug("smsc_pdu: %s" % smsc_pdu) - debug("sms_submit_pdu: %s" % sms_submit_pdu) - debug("tpmessref_pdu: %s" % tpmessref_pdu) - debug("sms_phone_pdu: %s" % sms_phone_pdu) - debug("tppid_pdu: %s" % tppid_pdu) - debug("sms_msg_pdu: %s" % sms_msg_pdu) - debug("-" * 20) - debug("full_pdu: %s" % pdu) - debug("full_text: %s" % self.text) - debug("-" * 20) + logging.debug("smsc_pdu: %s" % smsc_pdu) + logging.debug("sms_submit_pdu: %s" % sms_submit_pdu) + logging.debug("tpmessref_pdu: %s" % tpmessref_pdu) + logging.debug("sms_phone_pdu: %s" % sms_phone_pdu) + logging.debug("tppid_pdu: %s" % tppid_pdu) + logging.debug("sms_msg_pdu: %s" % sms_msg_pdu) + logging.debug("-" * 20) + logging.debug("full_pdu: %s" % pdu) + logging.debug("full_text: %s" % self.text) + logging.debug("-" * 20) return [Pdu(pdu, len_smsc)] # multipart SMS @@ -114,16 +115,16 @@ def to_pdu(self): pdu += sms_phone_pdu pdu += tppid_pdu pdu += sms_msg_pdu_item - debug("smsc_pdu: %s" % smsc_pdu) - debug("sms_submit_pdu: %s" % sms_submit_pdu) - debug("tpmessref_pdu: %s" % tpmessref_pdu) - debug("sms_phone_pdu: %s" % sms_phone_pdu) - debug("tppid_pdu: %s" % tppid_pdu) - debug("sms_msg_pdu: %s" % sms_msg_pdu_item) - debug("-" * 20) - debug("full_pdu: %s" % pdu) - debug("full_text: %s" % self.text) - debug("-" * 20) + logging.debug("smsc_pdu: %s" % smsc_pdu) + logging.debug("sms_submit_pdu: %s" % sms_submit_pdu) + logging.debug("tpmessref_pdu: %s" % tpmessref_pdu) + logging.debug("sms_phone_pdu: %s" % sms_phone_pdu) + logging.debug("tppid_pdu: %s" % tppid_pdu) + logging.debug("sms_msg_pdu: %s" % sms_msg_pdu_item) + logging.debug("-" * 20) + logging.debug("full_pdu: %s" % pdu) + logging.debug("full_text: %s" % self.text) + logging.debug("-" * 20) pdu_list.append(Pdu(pdu, len_smsc, cnt=cnt, seq=i + 1)) @@ -204,7 +205,7 @@ def _get_sms_submit_pdu(self, udh=False): def _get_msg_pdu(self): # Data coding scheme if self.fmt is None: - if is_gsm_text(self.text): + if is_valid_gsm(self.text): self.fmt = 0x00 else: self.fmt = 0x08 @@ -307,6 +308,8 @@ def _split_sms_message(self, text): sms_ref &= 0xFF for i, msg in enumerate(msgs): + if isinstance(msg, bytes): + msg = msg.decode() i += 1 total_parts = len(msgs) if limit == consts.SEVENBIT_SIZE: @@ -314,9 +317,9 @@ def _split_sms_message(self, text): chr(sms_ref) + chr(total_parts) + chr(i)) padding = " " else: - udh = (unichr(int("%04x" % ((udh_len << 8) | mid), 16)) + - unichr(int("%04x" % ((data_len << 8) | sms_ref), 16)) + - unichr(int("%04x" % ((total_parts << 8) | i), 16))) + udh = (chr(int("%04x" % ((udh_len << 8) | mid), 16)) + + chr(int("%04x" % ((data_len << 8) | sms_ref), 16)) + + chr(int("%04x" % ((total_parts << 8) | i), 16))) padding = "" pdu_msgs.append(packing_func(padding + msg, udh)) @@ -327,4 +330,4 @@ def _get_rand_id(self): if not self.id_list: self.id_list = range(0, 255) - return self.id_list.pop(0) + return list(self.id_list).pop(0) diff --git a/messaging/sms/udh.py b/messaging/sms/udh.py index eecfa23..7087f74 100644 --- a/messaging/sms/udh.py +++ b/messaging/sms/udh.py @@ -1,7 +1,7 @@ # See LICENSE -class PortAddress(object): +class PortAddress: def __init__(self, dest_port, orig_port, eight_bits): self.dest_port = dest_port @@ -13,7 +13,7 @@ def __repr__(self): return "" % args -class ConcatReference(object): +class ConcatReference: def __init__(self, ref, cnt, seq, eight_bits): self.ref = ref @@ -26,7 +26,7 @@ def __repr__(self): return "" % args -class UserDataHeader(object): +class UserDataHeader: def __init__(self): self.concat = None diff --git a/messaging/sms/wap.py b/messaging/sms/wap.py index 46611ab..7f5ff90 100644 --- a/messaging/sms/wap.py +++ b/messaging/sms/wap.py @@ -6,7 +6,7 @@ def is_a_wap_push_notification(s): - if not isinstance(s, str): + if not isinstance(s, bytes): raise TypeError("data must be an array.array serialised to string") data = array("B", s) diff --git a/messaging/test/test_gsm_encoding.py b/messaging/test/test_gsm_encoding.py deleted file mode 100644 index 364cb56..0000000 --- a/messaging/test/test_gsm_encoding.py +++ /dev/null @@ -1,267 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright (C) 2011 Sphere Systems Ltd -# Author: Andrew Bird -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -"""Unittests for the gsm encoding/decoding module""" - -import unittest -import messaging.sms.gsm0338 # imports GSM7 codec - -# Reversed from: ftp://ftp.unicode.org/Public/MAPPINGS/ETSI/GSM0338.TXT -MAP = { -# unichr(0x0000): (0x0000, 0x00), # Null - u'@': (0x0040, 0x00), - u'£': (0x00a3, 0x01), - u'$': (0x0024, 0x02), - u'¥': (0x00a5, 0x03), - u'è': (0x00e8, 0x04), - u'é': (0x00e9, 0x05), - u'ù': (0x00f9, 0x06), - u'ì': (0x00ec, 0x07), - u'ò': (0x00f2, 0x08), - u'Ç': (0x00c7, 0x09), # LATIN CAPITAL LETTER C WITH CEDILLA - unichr(0x000a): (0x000a, 0x0a), # Linefeed - u'Ø': (0x00d8, 0x0b), - u'ø': (0x00f8, 0x0c), - unichr(0x000d): (0x000d, 0x0d), # Carriage return - u'Å': (0x00c5, 0x0e), - u'å': (0x00e5, 0x0f), - u'Δ': (0x0394, 0x10), - u'_': (0x005f, 0x11), - u'Φ': (0x03a6, 0x12), - u'Γ': (0x0393, 0x13), - u'Λ': (0x039b, 0x14), - u'Ω': (0x03a9, 0x15), - u'Π': (0x03a0, 0x16), - u'Ψ': (0x03a8, 0x17), - u'Σ': (0x03a3, 0x18), - u'Θ': (0x0398, 0x19), - u'Ξ': (0x039e, 0x1a), - unichr(0x00a0): (0x00a0, 0x1b), # Escape to extension table (displayed - # as NBSP, on decode of invalid escape - # sequence) - u'Æ': (0x00c6, 0x1c), - u'æ': (0x00e6, 0x1d), - u'ß': (0x00df, 0x1e), - u'É': (0x00c9, 0x1f), - u' ': (0x0020, 0x20), - u'!': (0x0021, 0x21), - u'"': (0x0022, 0x22), - u'#': (0x0023, 0x23), - u'¤': (0x00a4, 0x24), - u'%': (0x0025, 0x25), - u'&': (0x0026, 0x26), - u'\'': (0x0027, 0x27), - u'{': (0x007b, 0x1b28), - u'}': (0x007d, 0x1b29), - u'*': (0x002a, 0x2a), - u'+': (0x002b, 0x2b), - u',': (0x002c, 0x2c), - u'-': (0x002d, 0x2d), - u'.': (0x002e, 0x2e), - u'\\': (0x005c, 0x1b2f), - u'0': (0x0030, 0x30), - u'1': (0x0031, 0x31), - u'2': (0x0032, 0x32), - u'3': (0x0033, 0x33), - u'4': (0x0034, 0x34), - u'5': (0x0035, 0x35), - u'6': (0x0036, 0x36), - u'7': (0x0037, 0x37), - u'8': (0x0038, 0x38), - u'9': (0x0039, 0x39), - u':': (0x003a, 0x3a), - u';': (0x003b, 0x3b), - u'[': (0x005b, 0x1b3c), - unichr(0x000c): (0x000c, 0x1b0a), # Formfeed - u']': (0x005d, 0x1b3e), - u'?': (0x003f, 0x3f), - u'|': (0x007c, 0x1b40), - u'A': (0x0041, 0x41), - u'B': (0x0042, 0x42), - u'C': (0x0043, 0x43), - u'D': (0x0044, 0x44), - u'E': (0x0045, 0x45), - u'F': (0x0046, 0x46), - u'G': (0x0047, 0x47), - u'H': (0x0048, 0x48), - u'I': (0x0049, 0x49), - u'J': (0x004a, 0x4a), - u'K': (0x004b, 0x4b), - u'L': (0x004c, 0x4c), - u'M': (0x004d, 0x4d), - u'N': (0x004e, 0x4e), - u'O': (0x004f, 0x4f), - u'P': (0x0050, 0x50), - u'Q': (0x0051, 0x51), - u'R': (0x0052, 0x52), - u'S': (0x0053, 0x53), - u'T': (0x0054, 0x54), - u'U': (0x0055, 0x55), - u'V': (0x0056, 0x56), - u'W': (0x0057, 0x57), - u'X': (0x0058, 0x58), - u'Y': (0x0059, 0x59), - u'Z': (0x005a, 0x5a), - u'Ä': (0x00c4, 0x5b), - u'Ö': (0x00d6, 0x5c), - u'Ñ': (0x00d1, 0x5d), - u'Ü': (0x00dc, 0x5e), - u'§': (0x00a7, 0x5f), - u'¿': (0x00bf, 0x60), - u'a': (0x0061, 0x61), - u'b': (0x0062, 0x62), - u'c': (0x0063, 0x63), - u'd': (0x0064, 0x64), - u'€': (0x20ac, 0x1b65), - u'f': (0x0066, 0x66), - u'g': (0x0067, 0x67), - u'h': (0x0068, 0x68), - u'<': (0x003c, 0x3c), - u'j': (0x006a, 0x6a), - u'k': (0x006b, 0x6b), - u'l': (0x006c, 0x6c), - u'm': (0x006d, 0x6d), - u'n': (0x006e, 0x6e), - u'~': (0x007e, 0x1b3d), - u'p': (0x0070, 0x70), - u'q': (0x0071, 0x71), - u'r': (0x0072, 0x72), - u's': (0x0073, 0x73), - u't': (0x0074, 0x74), - u'>': (0x003e, 0x3e), - u'v': (0x0076, 0x76), - u'i': (0x0069, 0x69), - u'x': (0x0078, 0x78), - u'^': (0x005e, 0x1b14), - u'z': (0x007a, 0x7a), - u'ä': (0x00e4, 0x7b), - u'ö': (0x00f6, 0x7c), - u'ñ': (0x00f1, 0x7d), - u'ü': (0x00fc, 0x7e), - u'à': (0x00e0, 0x7f), - u'¡': (0x00a1, 0x40), - u'/': (0x002f, 0x2f), - u'o': (0x006f, 0x6f), - u'u': (0x0075, 0x75), - u'w': (0x0077, 0x77), - u'y': (0x0079, 0x79), - u'e': (0x0065, 0x65), - u'=': (0x003d, 0x3d), - u'(': (0x0028, 0x28), - u')': (0x0029, 0x29), -} - -GREEK_MAP = { # Note: these might look like Latin uppercase, but they aren't - u'Α': (0x0391, 0x41), - u'Β': (0x0392, 0x42), - u'Ε': (0x0395, 0x45), - u'Η': (0x0397, 0x48), - u'Ι': (0x0399, 0x49), - u'Κ': (0x039a, 0x4b), - u'Μ': (0x039c, 0x4d), - u'Ν': (0x039d, 0x4e), - u'Ο': (0x039f, 0x4f), - u'Ρ': (0x03a1, 0x50), - u'Τ': (0x03a4, 0x54), - u'Χ': (0x03a7, 0x58), - u'Υ': (0x03a5, 0x59), - u'Ζ': (0x0396, 0x5a), -} - -QUIRK_MAP = { - u'ç': (0x00e7, 0x09), -} - -BAD = -1 - - -class TestEncodingFunctions(unittest.TestCase): - - def test_encoding_supported_unicode_gsm(self): - - for key in MAP.keys(): - # Use 'ignore' so that we see the code tested, not an exception - s_gsm = key.encode('gsm0338', 'ignore') - - if len(s_gsm) == 1: - i_gsm = ord(s_gsm) - elif len(s_gsm) == 2: - i_gsm = (ord(s_gsm[0]) << 8) + ord(s_gsm[1]) - else: - i_gsm = BAD # so we see the comparison, not an exception - - # We shouldn't generate an invalid escape sequence - if key == unichr(0x00a0): - self.assertEqual(BAD, i_gsm) - else: - self.assertEqual(MAP[key][1], i_gsm) - - def test_encoding_supported_greek_unicode_gsm(self): - # Note: Conversion is one way, hence no corresponding decode test - - for key in GREEK_MAP.keys(): - # Use 'replace' so that we trigger the mapping - s_gsm = key.encode('gsm0338', 'replace') - - if len(s_gsm) == 1: - i_gsm = ord(s_gsm) - else: - i_gsm = BAD # so we see the comparison, not an exception - - self.assertEqual(GREEK_MAP[key][1], i_gsm) - - def test_encoding_supported_quirk_unicode_gsm(self): - # Note: Conversion is one way, hence no corresponding decode test - - for key in QUIRK_MAP.keys(): - # Use 'replace' so that we trigger the mapping - s_gsm = key.encode('gsm0338', 'replace') - - if len(s_gsm) == 1: - i_gsm = ord(s_gsm) - else: - i_gsm = BAD # so we see the comparison, not an exception - - self.assertEqual(QUIRK_MAP[key][1], i_gsm) - - def test_decoding_supported_unicode_gsm(self): - for key in MAP.keys(): - i_gsm = MAP[key][1] - if i_gsm <= 0xff: - s_gsm = chr(i_gsm) - elif i_gsm <= 0xffff: - s_gsm = chr((i_gsm & 0xff00) >> 8) - s_gsm += chr(i_gsm & 0x00ff) - - s_unicode = s_gsm.decode('gsm0338', 'strict') - self.assertEqual(MAP[key][0], ord(s_unicode)) - - def test_is_gsm_text_true(self): - for key in MAP.keys(): - if key == unichr(0x00a0): - continue - self.assertEqual(messaging.sms.gsm0338.is_gsm_text(key), True) - - def test_is_gsm_text_false(self): - self.assertEqual( - messaging.sms.gsm0338.is_gsm_text(unichr(0x00a0)), False) - - for i in xrange(1, 0xffff + 1): - if unichr(i) not in MAP: - # Note: it's a little odd, but on error we want to see values - if messaging.sms.gsm0338.is_gsm_text(unichr(i)) is not False: - self.assertEqual(BAD, i) diff --git a/messaging/utils.py b/messaging/utils.py index 1456d79..0bf1d40 100644 --- a/messaging/utils.py +++ b/messaging/utils.py @@ -1,8 +1,10 @@ from array import array from datetime import timedelta, tzinfo from math import floor -import sys +import re +import binascii +HEX_STR = re.compile(r"^[0-9A-Fa-f]+$") class FixedOffset(tzinfo): """Fixed offset in minutes east from UTC.""" @@ -45,29 +47,15 @@ def dst(self, dt): def bytes_to_str(b): - if sys.version_info >= (3,): - return b.decode('latin1') - + if isinstance(b, bytes): + return b.decode() return b -def to_array(pdu): +def hex_to_int_array(pdu): return array('B', [int(pdu[i:i + 2], 16) for i in range(0, len(pdu), 2)]) -def to_bytes(s): - if sys.version_info >= (3,): - return bytes(s) - - return ''.join(map(unichr, s)) - - -def debug(s): - # set this to True if you want to poke at PDU encoding/decoding - if False: - print s - - def swap(s): """Swaps ``s`` according to GSM 23.040""" what = s[:] @@ -87,18 +75,33 @@ def clean_number(n): def encode_str(s): - """Returns the hexadecimal representation of ``s``""" + """ + Convert a string to hexidecimal values + + :param s: string + :type s: str + :return: hexidecimal representation of given string + :rtype: str + """ + # return binascii.hexlify(s.encode()).decode() return ''.join(["%02x" % ord(n) for n in s]) def encode_bytes(b): - return ''.join(["%02x" % n for n in b]) + """ + Convert to hexidecimal representation + + :param b: byte array + :type b: bytes + :return: Byte string converted to hex and returned as a string + :rtype: str + """ + return binascii.hexlify(b).decode() def pack_8bits_to_7bits(message, udh=None): pdu = "" txt = bytes_to_str(message) - if udh is None: tl = len(txt) txt += '\x00' @@ -147,6 +150,8 @@ def pack_8bits_to_7bits(message, udh=None): def pack_8bits_to_8bit(message, udh=None): text = message if udh is not None: + if isinstance(udh, bytes): + udh = udh.decode() text = udh + text mlen = len(text) @@ -160,6 +165,8 @@ def pack_8bits_to_ucs2(message, udh=None): nmesg = '' if udh is not None: + if isinstance(udh, bytes): + udh = udh.decode() text = udh + text for n in text: @@ -169,15 +176,67 @@ def pack_8bits_to_ucs2(message, udh=None): message = chr(mlen) + nmesg return encode_str(message) - def unpack_msg(pdu): + if isinstance(pdu, (array, list)): + return unpack_list_msg(pdu) + + if isinstance(pdu, bytes): + return unpack_hex_bytes_msg(pdu) + + if isinstance(pdu, str) and HEX_STR.match(pdu): + return unpack_hex_str_msg(pdu) + + raise TypeError('Unhandled Type %s' % type(pdu)) + +def unpack_hex_str_msg(pdu): """Unpacks ``pdu`` into septets and returns the decoded string""" # Taken/modified from Dave Berkeley's pysms package count = last = 0 result = [] - for i in range(0, len(pdu), 2): - byte = int(pdu[i:i + 2], 16) + prev_char = '' + count = last = 0 + result = [] + + for index, char in enumerate(pdu): + if index % 2 == 1: + byte = int(prev_char + char, 16) + else: + prev_char = char + continue + mask = 0x7F >> count + out = ((byte & mask) << count) + last + last = byte >> (7 - count) + result.append(out) + + if len(result) >= 0xa0: + break + + if count == 6: + result.append(last) + last = 0 + + count = (count + 1) % 7 + + return bytes(result) + + +def unpack_hex_bytes_msg(pdu): + """Unpacks ``pdu`` into septets and returns the decoded string""" + # Taken/modified from Dave Berkeley's pysms package + count = last = 0 + result = [] + + prev_byte = b'' + count = last = 0 + result = [] + + for index, byte in enumerate(pdu): + if index % 2 == 1: + byte = int(bytes([prev_byte, byte]), 16) + else: + prev_byte = byte + continue mask = 0x7F >> count out = ((byte & mask) << count) + last last = byte >> (7 - count) @@ -192,10 +251,10 @@ def unpack_msg(pdu): count = (count + 1) % 7 - return to_bytes(result) + return bytes(result) -def unpack_msg2(pdu): +def unpack_list_msg(pdu): """Unpacks ``pdu`` into septets and returns the decoded string""" # Taken/modified from Dave Berkeley's pysms package count = last = 0 @@ -216,7 +275,7 @@ def unpack_msg2(pdu): count = (count + 1) % 7 - return to_bytes(result) + return bytes(result) def timedelta_to_relative_validity(t): diff --git a/resources/pydump.py b/resources/pydump.py index 9daa570..d577264 100644 --- a/resources/pydump.py +++ b/resources/pydump.py @@ -105,6 +105,6 @@ s += " %02x" % unpack('B', c) # 000000 00 e0 1e a7 05 6f 00 10 - print "%06x%s" % (offset, s) + print("%06x%s" % (offset, s)) offset += perline diff --git a/setup.py b/setup.py index d826050..c23f98c 100644 --- a/setup.py +++ b/setup.py @@ -2,31 +2,27 @@ import sys from messaging import VERSION -extra = {} -if sys.version_info >= (3,): - extra['use_2to3'] = True - -setup(name="python-messaging", - version='%s.%s.%s' % VERSION, - description='SMS/MMS encoder/decoder', - license=open('COPYING').read(), - packages=find_packages(), - install_requires=['nose'], - zip_safe=True, - test_suite='nose.collector', - classifiers=[ - 'Development Status :: 4 - Beta', +setup( + name="python-messaging", + version='%s.%s.%s' % VERSION, + description='SMS/MMS encoder/decoder', + license=open('COPYING').read(), + packages=find_packages(exclude=["tests"]), + py_modules=["messaging"], + include_package_data=True, + package_data={'messaging': ['README.md']}, + zip_safe=False, + classifiers=[ + 'Development Status :: 5 - Production/Stable', 'License :: OSI Approved :: GNU General Public License (GPL)', 'Natural Language :: English', 'Operating System :: POSIX :: Linux', 'Programming Language :: Python', - 'Programming Language :: Python :: 2.5', - 'Programming Language :: Python :: 2.6', - 'Programming Language :: Python :: 2.7', 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.1', - 'Programming Language :: Python :: 3.2', + 'Programming Language :: Python :: 3.5', + 'Programming Language :: Python :: 3.6', + 'Programming Language :: Python :: 3.7', + 'Programming Language :: Python :: 3.8', 'Topic :: Communications :: Telephony', - ], - **extra + ], ) diff --git a/messaging/test/__init__.py b/tests/__init__.py similarity index 100% rename from messaging/test/__init__.py rename to tests/__init__.py diff --git a/messaging/test/mms-data/27d0a048cd79555de05283a22372b0eb.mms b/tests/mms-data/27d0a048cd79555de05283a22372b0eb.mms similarity index 100% rename from messaging/test/mms-data/27d0a048cd79555de05283a22372b0eb.mms rename to tests/mms-data/27d0a048cd79555de05283a22372b0eb.mms diff --git a/messaging/test/mms-data/BTMMS.MMS b/tests/mms-data/BTMMS.MMS similarity index 100% rename from messaging/test/mms-data/BTMMS.MMS rename to tests/mms-data/BTMMS.MMS diff --git a/messaging/test/mms-data/NOWMMS.MMS b/tests/mms-data/NOWMMS.MMS similarity index 100% rename from messaging/test/mms-data/NOWMMS.MMS rename to tests/mms-data/NOWMMS.MMS diff --git a/messaging/test/mms-data/SEC-SGHS300M.mms b/tests/mms-data/SEC-SGHS300M.mms similarity index 100% rename from messaging/test/mms-data/SEC-SGHS300M.mms rename to tests/mms-data/SEC-SGHS300M.mms diff --git a/messaging/test/mms-data/SIMPLE.MMS b/tests/mms-data/SIMPLE.MMS similarity index 100% rename from messaging/test/mms-data/SIMPLE.MMS rename to tests/mms-data/SIMPLE.MMS diff --git a/messaging/test/mms-data/SonyEricssonT310-R201.mms b/tests/mms-data/SonyEricssonT310-R201.mms similarity index 100% rename from messaging/test/mms-data/SonyEricssonT310-R201.mms rename to tests/mms-data/SonyEricssonT310-R201.mms diff --git a/messaging/test/mms-data/TOMSLOT.MMS b/tests/mms-data/TOMSLOT.MMS similarity index 100% rename from messaging/test/mms-data/TOMSLOT.MMS rename to tests/mms-data/TOMSLOT.MMS diff --git a/messaging/test/mms-data/gallery2test.mms b/tests/mms-data/gallery2test.mms similarity index 100% rename from messaging/test/mms-data/gallery2test.mms rename to tests/mms-data/gallery2test.mms diff --git a/messaging/test/mms-data/iPhone.mms b/tests/mms-data/iPhone.mms similarity index 100% rename from messaging/test/mms-data/iPhone.mms rename to tests/mms-data/iPhone.mms diff --git a/messaging/test/mms-data/images_are_cut_off_debug.mms b/tests/mms-data/images_are_cut_off_debug.mms similarity index 100% rename from messaging/test/mms-data/images_are_cut_off_debug.mms rename to tests/mms-data/images_are_cut_off_debug.mms diff --git a/messaging/test/mms-data/m.mms b/tests/mms-data/m.mms similarity index 100% rename from messaging/test/mms-data/m.mms rename to tests/mms-data/m.mms diff --git a/messaging/test/mms-data/openwave.mms b/tests/mms-data/openwave.mms similarity index 100% rename from messaging/test/mms-data/openwave.mms rename to tests/mms-data/openwave.mms diff --git a/messaging/test/mms-data/projekt_exempel.mms b/tests/mms-data/projekt_exempel.mms similarity index 100% rename from messaging/test/mms-data/projekt_exempel.mms rename to tests/mms-data/projekt_exempel.mms diff --git a/tests/test_gsm_encoding.py b/tests/test_gsm_encoding.py new file mode 100644 index 0000000..0ba0b18 --- /dev/null +++ b/tests/test_gsm_encoding.py @@ -0,0 +1,256 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2011 Sphere Systems Ltd +# Author: Andrew Bird +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +"""Unittests for the gsm encoding/decoding module""" + +from unittest import TestCase +from messaging.sms.gsm0338 import is_valid_gsm, decoding_map # imports GSM7 codec +# Reversed from: ftp://ftp.unicode.org/Public/MAPPINGS/ETSI/GSM0338.TXT +MAP = { +# chr(0x0000): (0x0000, 0x00), # Null + '@': (0x0040, 0x00), + '£': (0x00a3, 0x01), + '$': (0x0024, 0x02), + '¥': (0x00a5, 0x03), + 'è': (0x00e8, 0x04), + 'é': (0x00e9, 0x05), + 'ù': (0x00f9, 0x06), + 'ì': (0x00ec, 0x07), + 'ò': (0x00f2, 0x08), + 'Ç': (0x00c7, 0x09), # LATIN CAPITAL LETTER C WITH CEDILLA + chr(0x000a): (0x000a, 0x0a), # Linefeed + 'Ø': (0x00d8, 0x0b), + 'ø': (0x00f8, 0x0c), + chr(0x000d): (0x000d, 0x0d), # Carriage return + 'Å': (0x00c5, 0x0e), + 'å': (0x00e5, 0x0f), + 'Δ': (0x0394, 0x10), + '_': (0x005f, 0x11), + 'Φ': (0x03a6, 0x12), + 'Γ': (0x0393, 0x13), + 'Λ': (0x039b, 0x14), + 'Ω': (0x03a9, 0x15), + 'Π': (0x03a0, 0x16), + 'Ψ': (0x03a8, 0x17), + 'Σ': (0x03a3, 0x18), + 'Θ': (0x0398, 0x19), + 'Ξ': (0x039e, 0x1a), + chr(0x00a0): (0x00a0, 0x1b), # Escape to extension table (displayed + # as NBSP, on decode of invalid escape + # sequence) + 'Æ': (0x00c6, 0x1c), + 'æ': (0x00e6, 0x1d), + 'ß': (0x00df, 0x1e), + 'É': (0x00c9, 0x1f), + ' ': (0x0020, 0x20), + '!': (0x0021, 0x21), + '"': (0x0022, 0x22), + '#': (0x0023, 0x23), + '¤': (0x00a4, 0x24), + '%': (0x0025, 0x25), + '&': (0x0026, 0x26), + '\'': (0x0027, 0x27), + '{': (0x007b, 0x1b28), + '}': (0x007d, 0x1b29), + '*': (0x002a, 0x2a), + '+': (0x002b, 0x2b), + ',': (0x002c, 0x2c), + '-': (0x002d, 0x2d), + '.': (0x002e, 0x2e), + '\\': (0x005c, 0x1b2f), + '0': (0x0030, 0x30), + '1': (0x0031, 0x31), + '2': (0x0032, 0x32), + '3': (0x0033, 0x33), + '4': (0x0034, 0x34), + '5': (0x0035, 0x35), + '6': (0x0036, 0x36), + '7': (0x0037, 0x37), + '8': (0x0038, 0x38), + '9': (0x0039, 0x39), + ':': (0x003a, 0x3a), + ';': (0x003b, 0x3b), + '[': (0x005b, 0x1b3c), + chr(0x000c): (0x000c, 0x1b0a), # Formfeed + ']': (0x005d, 0x1b3e), + '?': (0x003f, 0x3f), + '|': (0x007c, 0x1b40), + 'A': (0x0041, 0x41), + 'B': (0x0042, 0x42), + 'C': (0x0043, 0x43), + 'D': (0x0044, 0x44), + 'E': (0x0045, 0x45), + 'F': (0x0046, 0x46), + 'G': (0x0047, 0x47), + 'H': (0x0048, 0x48), + 'I': (0x0049, 0x49), + 'J': (0x004a, 0x4a), + 'K': (0x004b, 0x4b), + 'L': (0x004c, 0x4c), + 'M': (0x004d, 0x4d), + 'N': (0x004e, 0x4e), + 'O': (0x004f, 0x4f), + 'P': (0x0050, 0x50), + 'Q': (0x0051, 0x51), + 'R': (0x0052, 0x52), + 'S': (0x0053, 0x53), + 'T': (0x0054, 0x54), + 'U': (0x0055, 0x55), + 'V': (0x0056, 0x56), + 'W': (0x0057, 0x57), + 'X': (0x0058, 0x58), + 'Y': (0x0059, 0x59), + 'Z': (0x005a, 0x5a), + 'Ä': (0x00c4, 0x5b), + 'Ö': (0x00d6, 0x5c), + 'Ñ': (0x00d1, 0x5d), + 'Ü': (0x00dc, 0x5e), + '§': (0x00a7, 0x5f), + '¿': (0x00bf, 0x60), + 'a': (0x0061, 0x61), + 'b': (0x0062, 0x62), + 'c': (0x0063, 0x63), + 'd': (0x0064, 0x64), + '€': (0x20ac, 0x1b65), + 'f': (0x0066, 0x66), + 'g': (0x0067, 0x67), + 'h': (0x0068, 0x68), + '<': (0x003c, 0x3c), + 'j': (0x006a, 0x6a), + 'k': (0x006b, 0x6b), + 'l': (0x006c, 0x6c), + 'm': (0x006d, 0x6d), + 'n': (0x006e, 0x6e), + '~': (0x007e, 0x1b3d), + 'p': (0x0070, 0x70), + 'q': (0x0071, 0x71), + 'r': (0x0072, 0x72), + 's': (0x0073, 0x73), + 't': (0x0074, 0x74), + '>': (0x003e, 0x3e), + 'v': (0x0076, 0x76), + 'i': (0x0069, 0x69), + 'x': (0x0078, 0x78), + '^': (0x005e, 0x1b14), + 'z': (0x007a, 0x7a), + 'ä': (0x00e4, 0x7b), + 'ö': (0x00f6, 0x7c), + 'ñ': (0x00f1, 0x7d), + 'ü': (0x00fc, 0x7e), + 'à': (0x00e0, 0x7f), + '¡': (0x00a1, 0x40), + '/': (0x002f, 0x2f), + 'o': (0x006f, 0x6f), + 'u': (0x0075, 0x75), + 'w': (0x0077, 0x77), + 'y': (0x0079, 0x79), + 'e': (0x0065, 0x65), + '=': (0x003d, 0x3d), + '(': (0x0028, 0x28), + ')': (0x0029, 0x29), +} + +GREEK_MAP = { # Note: these might look like Latin uppercase, but they aren't + 'Α': (0x0391, 0x41), + 'Β': (0x0392, 0x42), + 'Ε': (0x0395, 0x45), + 'Η': (0x0397, 0x48), + 'Ι': (0x0399, 0x49), + 'Κ': (0x039a, 0x4b), + 'Μ': (0x039c, 0x4d), + 'Ν': (0x039d, 0x4e), + 'Ο': (0x039f, 0x4f), + 'Ρ': (0x03a1, 0x50), + 'Τ': (0x03a4, 0x54), + 'Χ': (0x03a7, 0x58), + 'Υ': (0x03a5, 0x59), + 'Ζ': (0x0396, 0x5a), +} + +QUIRK_MAP = { + 'ç': (0x00e7, 0x09), +} + +BAD = -1 + + +class TestEncodingFunctions(TestCase): + + def test_encoding_supported_unicode_gsm(self): + + for key in list(MAP.keys()): + # Use 'ignore' so that we see the code tested, not an exception + s_gsm = key.encode('gsm0338', 'ignore') + if len(s_gsm) == 1: + i_gsm = ord(s_gsm) + elif len(s_gsm) == 2: + i_gsm = (s_gsm[0] << 8) + s_gsm[1] + else: + i_gsm = BAD # so we see the comparison, not an exception + + # We shouldn't generate an invalid escape sequence + if key == chr(0x00a0): + self.assertEqual(BAD, i_gsm) + else: + self.assertEqual(MAP[key][1], i_gsm) + + def test_encoding_supported_greek_unicode_gsm(self): + # Note: Conversion is one way, hence no corresponding decode test + + for key in list(GREEK_MAP.keys()): + # Use 'replace' so that we trigger the mapping + s_gsm = key.encode('gsm0338', 'replace') + if len(s_gsm) != 1: + s_gsm = BAD # so we see the comparison, not an exception + + self.assertEqual(GREEK_MAP[key][1], ord(s_gsm)) + + def test_encoding_supported_quirk_unicode_gsm(self): + # Note: Conversion is one way, hence no corresponding decode test + + for key in list(QUIRK_MAP.keys()): + # Use 'replace' so that we trigger the mapping + s_gsm = key.encode('gsm0338', 'replace') + if len(s_gsm) != 1: + s_gsm = BAD # so we see the comparison, not an exception + + self.assertEqual(QUIRK_MAP[key][1], ord(s_gsm)) + + def test_decoding_supported_unicode_gsm(self): + for key in list(MAP.keys()): + i_gsm = MAP[key][1] + if i_gsm <= 0xff: + s_unicode = bytes([i_gsm]).decode('gsm0338') + elif i_gsm <= 0xffff: + s_unicode = bytes([((i_gsm & 0xff00) >> 8), i_gsm & 0x00ff]).decode('gsm0338') + + self.assertEqual(MAP[key][0], ord(s_unicode)) + + def test_is_valid_gsm_true(self): + for key in list(MAP.keys()): + if key == chr(0x00a0): + continue + self.assertTrue(is_valid_gsm(key)) + + def test_is_valid_gsm_false(self): + self.assertFalse(is_valid_gsm(chr(0x00a0))) + + for i in range(1, 0xffff + 1): + if chr(i) not in MAP: + # Note: it's a little odd, but on error we want to see values + if is_valid_gsm(chr(i)): + self.assertEqual(BAD, i) diff --git a/messaging/test/test_mms.py b/tests/test_mms.py similarity index 71% rename from messaging/test/test_mms.py rename to tests/test_mms.py index 2cf6bf1..b1836e1 100644 --- a/messaging/test/test_mms.py +++ b/tests/test_mms.py @@ -2,7 +2,8 @@ from array import array import datetime import os -import unittest +import binascii +from unittest import TestCase from messaging.mms.message import MMSMessage @@ -11,7 +12,7 @@ DATA_DIR = os.path.join(os.path.dirname(__file__), 'mms-data') -class TestMmsDecoding(unittest.TestCase): +class TestMmsDecoding(TestCase): def test_decoding_from_data(self): path = os.path.join(DATA_DIR, 'iPhone.mms') @@ -35,7 +36,7 @@ def test_decoding_iPhone_mms(self): 'Message-Type': 'm-send-req', 'Content-Type': ('application/vnd.wap.multipart.related', {'Start': '0.smil', 'Type': 'application/smil'}), } - smil_data = '\n\n\n \n\n\n\n\n\n\n\n\n\n\n' + smil_data = b'\n\n\n \n\n\n\n\n\n\n\n\n\n\n' self.assertEqual(mms.headers, headers) self.assertEqual(mms.content_type, 'application/vnd.wap.multipart.related') @@ -57,7 +58,7 @@ def test_decoding_SIMPLE_mms(self): 'Content-Type': ('application/vnd.wap.multipart.related', {}), 'Subject': 'Simple message', } - text_data = "This is a simple MMS message with a single text body part." + text_data = b"This is a simple MMS message with a single text body part." self.assertEqual(mms.headers, headers) self.assertEqual(mms.content_type, 'application/vnd.wap.multipart.related') @@ -76,8 +77,8 @@ def test_decoding_BTMMS_mms(self): 'Content-Type': ('application/vnd.wap.multipart.related', {'Start': '', 'Type': 'application/smil'}), 'Subject': 'BT Ignite MMS', } - smil_data = '\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n' - text_data = 'BT Ignite\r\n\r\nMMS Services' + smil_data = b'\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n' + text_data = b'BT Ignite\r\n\r\nMMS Services' self.assertEqual(mms.headers, headers) self.assertEqual(mms.content_type, 'application/vnd.wap.multipart.related') @@ -94,15 +95,15 @@ def test_decoding_TOMSLOT_mms(self): mms = MMSMessage.from_file(path) self.assertTrue(isinstance(mms, MMSMessage)) headers = { - 'From': '616c6c616e40746f6d736c6f742e636f6d'.decode('hex'), + 'From': binascii.unhexlify(b'616c6c616e40746f6d736c6f742e636f6d').decode(), 'Transaction-Id': '1234', 'MMS-Version': '1.0', 'Message-Type': 'm-retrieve-conf', 'Date': datetime.datetime(2003, 2, 16, 3, 48, 33), 'Content-Type': ('application/vnd.wap.multipart.related', {'Start': '', 'Type': 'application/smil'}), 'Subject': 'Tom Slot Band', } - smil_data = '\r\n\t\r\n\t\t\r\n\t\t\r\n\t\t\r\n\t\t\t\r\n\t\t\t\r\n\t\t\t\r\n\t\t\r\n\t\r\n\t\r\n\t\t\r\n\t\t\t\r\n\t\t\r\n\t\t\r\n\t\t\t\r\n\t\t\r\n\t\t\r\n\t\t\t\r\n\t\t\r\n\t\t\r\n\t\t\t\r\n\t\t\r\n\t\t\r\n\t\t\t\r\n\t\t\t\r\n\t\t\t\t\r\n\t\t\t\r\n\t\t\t\r\n\t\r\n\r\n' - text_data = 'Presented by NowMMS\r\n' + smil_data = b'\r\n\t\r\n\t\t\r\n\t\t\r\n\t\t\r\n\t\t\t\r\n\t\t\t\r\n\t\t\t\r\n\t\t\r\n\t\r\n\t\r\n\t\t\r\n\t\t\t\r\n\t\t\r\n\t\t\r\n\t\t\t\r\n\t\t\r\n\t\t\r\n\t\t\t\r\n\t\t\r\n\t\t\r\n\t\t\t\r\n\t\t\r\n\t\t\r\n\t\t\t\r\n\t\t\t\r\n\t\t\t\t\r\n\t\t\t\r\n\t\t\t\r\n\t\r\n\r\n' + text_data = b'Presented by NowMMS\r\n' self.assertEqual(mms.headers, headers) self.assertEqual(mms.content_type, 'application/vnd.wap.multipart.related') @@ -125,13 +126,13 @@ def test_decoding_images_are_cut_off_debug_mms(self): headers = { 'From': '', 'Read-Reply': False, 'Transaction-Id': '2112410527', 'MMS-Version': '1.0', - 'To': '7464707440616a616a672e63646d'.decode('hex'), + 'To': binascii.unhexlify(b'7464707440616a616a672e63646d').decode(), 'Delivery-Report': False, 'Message-Type': 'm-send-req', 'Content-Type': ('application/vnd.wap.multipart.related', {'Start': '', 'Type': 'application/smil'}), 'Subject': 'Picture3', } - smil_data = '' + smil_data = b'' self.assertEqual(mms.headers, headers) self.assertEqual(len(mms.data_parts), 2) self.assertEqual(mms.content_type, @@ -147,7 +148,7 @@ def test_decoding_openwave_mms(self): mms = MMSMessage.from_file(path) self.assertTrue(isinstance(mms, MMSMessage)) headers = { - 'From': '2b31363530353535303030302f545950453d504c4d4e'.decode('hex'), + 'From': binascii.unhexlify(b'2b31363530353535303030302f545950453d504c4d4e').decode(), 'Message-Class': 'Personal', 'Transaction-Id': '1067263672', 'MMS-Version': '1.0', 'Priority': 'Normal', 'To': '112/TYPE=PLMN', @@ -155,8 +156,8 @@ def test_decoding_openwave_mms(self): 'Content-Type': ('application/vnd.wap.multipart.related', {'Start': '', 'Type': 'application/smil'}), 'Subject': 'rubrik', } - smil_data = '\n \n \n \n \n \n \n \n \n \n \n \n \n\n' - text_data = 'rubrik' + smil_data = b'\n \n \n \n \n \n \n \n \n \n \n \n \n\n' + text_data = b'rubrik' self.assertEqual(mms.headers, headers) self.assertEqual(len(mms.data_parts), 2) self.assertEqual(mms.content_type, @@ -178,8 +179,8 @@ def test_decoding_SonyEricssonT310_R201_mms(self): 'Date': datetime.datetime(2004, 3, 18, 7, 30, 34), 'Content-Type': ('application/vnd.wap.multipart.related', {'Start': '', 'Type': 'application/smil'}), } - text_data = 'Hej hopp' - smil_data = '\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n' + text_data = b'Hej hopp' + smil_data = b'\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n' self.assertEqual(mms.headers, headers) self.assertEqual(len(mms.data_parts), 4) self.assertEqual(mms.content_type, @@ -200,7 +201,7 @@ def test_decoding_gallery2test_mms(self): mms = MMSMessage.from_file(path) self.assertTrue(isinstance(mms, MMSMessage)) headers = { - 'From': '2b31363530353535303030302f545950453d504c4d4e'.decode('hex'), + 'From': binascii.unhexlify(b'2b31363530353535303030302f545950453d504c4d4e').decode(), 'Message-Class': 'Personal', 'Transaction-Id': '1118775337', 'MMS-Version': '1.0', 'Priority': 'Normal', 'To': 'Jg', 'Delivery-Report': False, @@ -208,8 +209,8 @@ def test_decoding_gallery2test_mms(self): 'Content-Type': ('application/vnd.wap.multipart.related', {'Start': '', 'Type': 'application/smil'}), 'Subject': 'Jgj', } - text_data = 'Jgj' - smil_data = '\n \n \n \n \n \n \n \n \n \n gnu-head\n \n \n \n\n' + text_data = b'Jgj' + smil_data = b'\n \n \n \n \n \n \n \n \n \n gnu-head\n \n \n \n\n' self.assertEqual(mms.headers, headers) self.assertEqual(len(mms.data_parts), 3) self.assertEqual(mms.content_type, @@ -237,8 +238,8 @@ def test_decoding_projekt_exempel_mms(self): 'Content-Type': ('application/vnd.wap.multipart.related', {'Start': '', 'Type': 'application/smil'}), 'Subject': 'Hej', } - smil_data = '\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n' - text_data = 'Jonatan \xc3\xa4r en GNU' + smil_data = b'\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n' + text_data = b'Jonatan \xc3\xa4r en GNU' self.assertEqual(mms.headers, headers) self.assertEqual(len(mms.data_parts), 3) self.assertEqual(mms.content_type, @@ -256,21 +257,21 @@ def test_decoding_m_mms(self): mms = MMSMessage.from_file(path) self.assertTrue(isinstance(mms, MMSMessage)) headers = { - 'From': '676f6c64706f737440686f746d61696c2e636f6d'.decode('hex'), + 'From': binascii.unhexlify(b'676f6c64706f737440686f746d61696c2e636f6d').decode(), 'Transaction-Id': '0000000001', 'MMS-Version': '1.0', 'Message-Type': 'm-retrieve-conf', 'Date': datetime.datetime(2002, 8, 9, 13, 8, 2), 'Content-Type': ('application/vnd.wap.multipart.related', {'Start': '', 'Type': 'application/smil'}), 'Subject': 'GOLD', } - text_data1 = 'Audio' - text_data2 = 'Text +' - text_data3 = 'tagtag.com/gold\r\n' - text_data4 = 'globalisierunglobalisierunglobalisierunglobalisierunglobalisierunglobalisierunglobalisierungnureisilabolg' - text_data5 = 'KLONE\r\nKLONE\r\n' - text_data6 = 'pr\xe4sentiert..' - text_data7 = 'GOLD' - smil_data = '\r\n\r\n\r\n\r\n\r\n\r\n\r\n \r\n \r\n\r\n\r\n\r\n\r\n \r\n \r\n\r\n\r\n\r\n\r\n \r\n \r\n\r\n\r\n\r\n\r\n \r\n \r\n\r\n\r\n\r\n\r\n\r\n \r\n \r\n\r\n\r\n\r\n\r\n \r\n \r\n\r\n\r\n\r\n\r\n\r\n \r\n \r\n\r\n\r\n' + text_data1 = b'Audio' + text_data2 = b'Text +' + text_data3 = b'tagtag.com/gold\r\n' + text_data4 = b'globalisierunglobalisierunglobalisierunglobalisierunglobalisierunglobalisierunglobalisierungnureisilabolg' + text_data5 = b'KLONE\r\nKLONE\r\n' + text_data6 = b'pr\xe4sentiert..' + text_data7 = b'GOLD' + smil_data = b'\r\n\r\n\r\n\r\n\r\n\r\n\r\n \r\n \r\n\r\n\r\n\r\n\r\n \r\n \r\n\r\n\r\n\r\n\r\n \r\n \r\n\r\n\r\n\r\n\r\n \r\n \r\n\r\n\r\n\r\n\r\n\r\n \r\n \r\n\r\n\r\n\r\n\r\n \r\n \r\n\r\n\r\n\r\n\r\n\r\n \r\n \r\n\r\n\r\n' self.assertEqual(mms.headers, headers) self.assertEqual(len(mms.data_parts), 9) self.assertEqual(mms.content_type, 'application/vnd.wap.multipart.related') @@ -321,10 +322,10 @@ def test_decoding_27d0a048cd79555de05283a22372b0eb_mms(self): 'Date': datetime.datetime(2004, 5, 23, 14, 14, 58), 'Content-Type': ('application/vnd.wap.multipart.related', {'Start': '', 'Type': 'application/smil'}), 'Subject': 'Angående art-tillhörighet', - #'Subject': 'Ang\xc3\xa5ende art-tillh\xc3\xb6righet', + # 'Subject': 'Ang\xc3\xa5ende art-tillh\xc3\xb6righet', } - smil_data = '\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n' - text_data = 'Jonatan \xc3\xa4r en gnu.' + smil_data = b'\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n' + text_data = b'Jonatan \xc3\xa4r en gnu.' self.assertEqual(mms.headers, headers) self.assertEqual(len(mms.data_parts), 3) self.assertEqual(mms.content_type, @@ -349,12 +350,12 @@ def test_decoding_SEC_SGHS300M(self): 'Sender-Visibility': 'Show', 'From': '', 'Read-Reply': False, 'Message-Class': 'Personal', 'Transaction-Id': '31887', 'MMS-Version': '1.0', - 'To': '303733383334353636342f545950453d504c4d4e'.decode('hex'), + 'To': binascii.unhexlify(b'303733383334353636342f545950453d504c4d4e').decode(), 'Delivery-Report': False, 'Message-Type': 'm-send-req', 'Subject': 'IL', 'Content-Type': ('application/vnd.wap.multipart.mixed', {}), } - text_data = 'HV' + text_data = b'HV' self.assertEqual(mms.headers, headers) self.assertEqual(len(mms.data_parts), 1) self.assertEqual(mms.content_type, diff --git a/messaging/test/test_sms.py b/tests/test_sms.py similarity index 83% rename from messaging/test/test_sms.py rename to tests/test_sms.py index c3ff7b5..93ee60f 100644 --- a/messaging/test/test_sms.py +++ b/tests/test_sms.py @@ -1,10 +1,7 @@ # -*- coding: utf-8 -*- from datetime import datetime, timedelta - -try: - import unittest2 as unittest -except ImportError: - import unittest +import binascii +from unittest import TestCase from messaging.sms import SmsSubmit, SmsDeliver from messaging.utils import (timedelta_to_relative_validity as to_relative, @@ -12,7 +9,7 @@ FixedOffset) -class TestEncodingFunctions(unittest.TestCase): +class TestEncodingFunctions(TestCase): def test_converting_timedelta_to_validity(self): self.assertRaises(ValueError, to_relative, timedelta(minutes=4)) @@ -52,11 +49,11 @@ def test_converting_datetime_to_validity(self): self.assertEqual(to_absolute(when, "GMT-3"), expected) -class TestSmsSubmit(unittest.TestCase): +class TestSmsSubmit(TestCase): def test_encoding_validity(self): # no validity - number = '2b3334363136353835313139'.decode('hex') + number = binascii.unhexlify(b'2b3334363136353835313139').decode() text = "hola" expected = "0001000B914316565811F9000004E8373B0C" @@ -67,7 +64,7 @@ def test_encoding_validity(self): self.assertEqual(pdu.pdu, expected) # absolute validity - number = '2b3334363136353835313139'.decode('hex') + number = binascii.unhexlify(b'2b3334363136353835313139').decode() text = "hola" expected = "0019000B914316565811F900000170520251930004E8373B0C" @@ -79,7 +76,7 @@ def test_encoding_validity(self): self.assertEqual(pdu.pdu, expected) # relative validity - number = '2b3334363136353835313139'.decode('hex') + number = binascii.unhexlify(b'2b3334363136353835313139').decode() text = "hola" expected = "0011000B914316565811F90000AA04E8373B0C" expected_len = 18 @@ -93,7 +90,7 @@ def test_encoding_validity(self): self.assertEqual(pdu.length, expected_len) def test_encoding_csca(self): - number = '2b3334363136353835313139'.decode('hex') + number = binascii.unhexlify(b'2b3334363136353835313139').decode() text = "hola" csca = "+34646456456" expected = "07914346466554F601000B914316565811F9000004E8373B0C" @@ -110,7 +107,7 @@ def test_encoding_csca(self): self.assertEqual(pdu.seq, 1) def test_encoding_class(self): - number = '2b3334363534313233343536'.decode('hex') + number = binascii.unhexlify(b'2b3334363534313233343536').decode() text = "hey yo" expected_0 = "0001000B914356143254F6001006E8721E947F03" expected_1 = "0001000B914356143254F6001106E8721E947F03" @@ -138,7 +135,7 @@ def test_encoding_class(self): def test_encoding_request_status(self): # tested with pduspy.exe and http://www.rednaxela.net/pdu.php - number = '2b3334363534313233343536'.decode('hex') + number = binascii.unhexlify(b'2b3334363534313233343536').decode() text = "hey yo" expected = "0021000B914356143254F6000006E8721E947F03" @@ -151,8 +148,8 @@ def test_encoding_request_status(self): def test_encoding_message_with_latin1_chars(self): # tested with pduspy.exe - number = '2b3334363534313233343536'.decode('hex') - text = u"Hölä" + number = binascii.unhexlify(b'2b3334363534313233343536').decode() + text = "Hölä" expected = "0011000B914356143254F60000AA04483E7B0F" sms = SmsSubmit(number, text) @@ -163,8 +160,8 @@ def test_encoding_message_with_latin1_chars(self): self.assertEqual(pdu.pdu, expected) # tested with pduspy.exe - number = '2b3334363534313233343536'.decode('hex') - text = u"BÄRÇA äñ@" + number = binascii.unhexlify(b'2b3334363534313233343536').decode() + text = "BÄRÇA äñ@" expected = "0001000B914356143254F6000009C2AD341104EDFB00" sms = SmsSubmit(number, text) @@ -188,8 +185,8 @@ def test_encoding_8bit_message(self): self.assertEqual(pdu.pdu, expected) def test_encoding_ucs2_message(self): - number = '2b3334363136353835313139'.decode('hex') - text = u'あ叶葉' + number = binascii.unhexlify(b'2b3334363136353835313139').decode() + text = 'あ叶葉' csca = '+34646456456' expected = "07914346466554F601000B914316565811F9000806304253F68449" @@ -200,8 +197,8 @@ def test_encoding_ucs2_message(self): pdu = sms.to_pdu()[0] self.assertEqual(pdu.pdu, expected) - text = u"Русский" - number = '363535333435363738'.decode('hex') + text = "Русский" + number = binascii.unhexlify(b'363535333435363738').decode() expected = "001100098156355476F80008AA0E0420044304410441043A04380439" sms = SmsSubmit(number, text) @@ -214,7 +211,7 @@ def test_encoding_ucs2_message(self): def test_encoding_multipart_7bit(self): # text encoded with umts-tools text = "Or walk with Kings - nor lose the common touch, if neither foes nor loving friends can hurt you, If all men count with you, but none too much; If you can fill the unforgiving minute With sixty seconds' worth of distance run, Yours is the Earth and everything thats in it, And - which is more - you will be a Man, my son" - number = '363535333435363738'.decode('hex') + number = binascii.unhexlify(b'363535333435363738').decode() expected = [ "005100098156355476F80000AAA00500038803019E72D03DCC5E83EE693A1AB44CBBCF73500BE47ECB41ECF7BC0CA2A3CBA0F1BBDD7EBB41F4777D8C6681D26690BB9CA6A3CB7290F95D9E83DC6F3988FDB6A7DD6790599E2EBBC973D038EC06A1EB723A28FFAEB340493328CC6683DA653768FCAEBBE9A07B9A8E06E5DF7516485CA783DC6F7719447FBF41EDFA18BD0325CDA0FCBB0E1A87DD", "005100098156355476F80000AAA005000388030240E6349B0DA2A3CBA0BADBFC969FD3F6B4FB0C6AA7DD757A19744DD3D1A0791A4FCF83E6E5F1DB4D9E9F40F7B79C8E06BDCD20727A4E0FBBC76590BCEE6681B2EFBA7C0E4ACF41747419540CCBE96850D84D0695ED65799E8E4EBBCF203A3A4C9F83D26E509ACE0205DD64500B7447A7C768507A0E6ABFE565500B947FD741F7349B0D129741", @@ -233,6 +230,29 @@ def test_encoding_multipart_7bit(self): self.assertEqual(pdu.seq, i + 1) self.assertEqual(pdu.cnt, cnt) + def test_encoding_multipart_7bit_egsm(self): + # text encoded with umts-tools + self.maxDiff = None + text = '€' * 229 + 'x' + number = binascii.unhexlify(b'363535333435363738').decode() + expected = [ + "005100098156355476F80000AAA005000388030136E54D7953DE9437E54D7953DE9437E54D7953DE9437E54D7953DE9437E54D7953DE9437E54D7953DE9437E54D7953DE9437E54D7953DE9437E54D7953DE9437E54D7953DE9437E54D7953DE9437E54D7953DE9437E54D7953DE9437E54D7953DE9437E54D7953DE9437E54D7953DE9437E54D7953DE9437E54D7953DE9437E54D7953DE9437", + "005100098156355476F80000AAA0050003880302CA9BF2A6BC296FCA9BF2A6BC296FCA9BF2A6BC296FCA9BF2A6BC296FCA9BF2A6BC296FCA9BF2A6BC296FCA9BF2A6BC296FCA9BF2A6BC296FCA9BF2A6BC296FCA9BF2A6BC296FCA9BF2A6BC296FCA9BF2A6BC296FCA9BF2A6BC296FCA9BF2A6BC296FCA9BF2A6BC296FCA9BF2A6BC296FCA9BF2A6BC296FCA9BF2A6BC296FCA9BF2A6BC296FCA", + "005100098156355476F80000AAA005000388030336E54D7953DE9437E54D7953DE9437E54D7953DE9437E54D7953DE9437E54D7953DE9437E54D7953DE9437E54D7953DE9437E54D7953DE9437E54D7953DE9437E54D7953DE9437E54D7953DE9437E54D7953DE9437E54D7953DE9437E54D7953DE9437E54D7953DE9437E54D7953DE9437E54D7953DE9437E54D7953DE9437E54D7953DE94F1", + ] + + sms = SmsSubmit(number, text) + sms.ref = 0x0 + sms.rand_id = 136 + sms.validity = timedelta(days=4) + + ret = sms.to_pdu() + cnt = len(ret) + for i, pdu in enumerate(ret): + self.assertEqual(pdu.pdu, expected[i]) + self.assertEqual(pdu.seq, i + 1) + self.assertEqual(pdu.cnt, cnt) + def test_encoding_bad_number_raises_error(self): self.assertRaises(ValueError, SmsSubmit, "032BADNUMBER", "text") @@ -241,12 +261,12 @@ def test_encoding_bad_csca_raises_error(self): self.assertRaises(ValueError, setattr, sms, 'csca', "1badcsca") -class TestSubmitPduCounts(unittest.TestCase): +class TestSubmitPduCounts(TestCase): DEST = "+3530000000" GSM_CHAR = "x" - EGSM_CHAR = u"€" - UNICODE_CHAR = u"ő" + EGSM_CHAR = "€" + UNICODE_CHAR = "ő" def test_gsm_1(self): sms = SmsSubmit(self.DEST, self.GSM_CHAR * 160) @@ -285,11 +305,16 @@ def test_egsm_2(self): def test_egsm_3(self): sms = SmsSubmit(self.DEST, self.EGSM_CHAR * 153) # 306 septets - self.assertEqual(len(sms.to_pdu()), 3) + self.assertEqual(len(sms.to_pdu()), 2) def test_egsm_4(self): sms = SmsSubmit(self.DEST, self.EGSM_CHAR * 229 + self.GSM_CHAR) # 459 septets + self.assertEqual(len(sms.to_pdu()), 3) + + def test_egsm_5(self): + sms = SmsSubmit(self.DEST, + self.EGSM_CHAR * 270 + self.GSM_CHAR) # 541 septets self.assertEqual(len(sms.to_pdu()), 4) def test_unicode_1(self): @@ -317,13 +342,13 @@ def test_unicode_6(self): self.assertEqual(len(sms.to_pdu()), 4) -class TestSmsDeliver(unittest.TestCase): +class TestSmsDeliver(TestCase): def test_decoding_7bit_pdu(self): pdu = "07911326040000F0040B911346610089F60000208062917314080CC8F71D14969741F977FD07" text = "How are you?" csca = "+31624000000" - number = '2b3331363431363030393836'.decode('hex') + number = binascii.unhexlify(b'2b3331363431363030393836').decode() sms = SmsDeliver(pdu) self.assertEqual(sms.text, text) @@ -332,9 +357,9 @@ def test_decoding_7bit_pdu(self): def test_decoding_ucs2_pdu(self): pdu = "07914306073011F0040B914316709807F2000880604290224080084E2D5174901A8BAF" - text = u"中兴通讯" + text = "中兴通讯" csca = "+34607003110" - number = '2b3334363130373839373032'.decode('hex') + number = binascii.unhexlify(b'2b3334363130373839373032').decode() sms = SmsDeliver(pdu) self.assertEqual(sms.text, text) @@ -345,7 +370,7 @@ def test_decoding_7bit_pdu_data(self): pdu = "07911326040000F0040B911346610089F60000208062917314080CC8F71D14969741F977FD07" text = "How are you?" csca = "+31624000000" - number = '2b3331363431363030393836'.decode('hex') + number = binascii.unhexlify(b'2b3331363431363030393836').decode() data = SmsDeliver(pdu).data self.assertEqual(data['text'], text) @@ -358,7 +383,7 @@ def test_decoding_7bit_pdu_data(self): def test_decoding_datetime_gmtplusone(self): pdu = "0791447758100650040C914497716247010000909010711423400A2050EC468B81C4733A" text = " 1741 bst" - number = '2b343437393137323637343130'.decode('hex') + number = binascii.unhexlify(b'2b343437393137323637343130').decode() date = datetime(2009, 9, 1, 16, 41, 32) sms = SmsDeliver(pdu) @@ -416,8 +441,8 @@ def test_decode_weird_multipart_german_pdu(self): "07919471227210244405852122F039F1015062712181804F050003190202E4E8309B5E7683DAFC319A5E76B340F73D9A5D7683A6E93268FD9ED3CB6EF67B0E5AD172B19B2C2693C9602E90355D6683A6F0B007946E8382F5393BEC26BB00", ] texts = [ - u"Lieber Vodafone-Kunde, mit Ihrer nationalen Tarifoption zahlen Sie in diesem Netz 3,45 € pro MB plus 59 Ct pro Session. Wenn Sie diese Info nicht mehr e", - u"rhalten möchten, wählen Sie kostenlos +4917212220. Viel Spaß im Ausland.", + "Lieber Vodafone-Kunde, mit Ihrer nationalen Tarifoption zahlen Sie in diesem Netz 3,45 € pro MB plus 59 Ct pro Session. Wenn Sie diese Info nicht mehr e", + "rhalten möchten, wählen Sie kostenlos +4917212220. Viel Spaß im Ausland.", ] for i, sms in enumerate(map(SmsDeliver, pdus)): @@ -464,7 +489,7 @@ def test_decoding_delivery_status_report_without_smsc_address(self): } sms = SmsDeliver(pdu) - self.assertEqual(sms.csca, None) + self.assertIsNone(sms.csca) data = sms.data self.assertEqual(data['ref'], 5) self.assertEqual(sms.sr, sr) diff --git a/messaging/test/test_udh.py b/tests/test_udh.py similarity index 71% rename from messaging/test/test_udh.py rename to tests/test_udh.py index 9496ff6..eab5b5e 100644 --- a/messaging/test/test_udh.py +++ b/tests/test_udh.py @@ -1,13 +1,13 @@ -import unittest +from unittest import TestCase from messaging.sms.udh import UserDataHeader -from messaging.utils import to_array +from messaging.utils import hex_to_int_array -class TestUserDataHeader(unittest.TestCase): +class TestUserDataHeader(TestCase): def test_user_data_header(self): - data = to_array("08049f8e020105040b8423f0") + data = hex_to_int_array("08049f8e020105040b8423f0") udh = UserDataHeader.from_bytes(data) self.assertEqual(udh.concat.seq, 1) @@ -16,7 +16,7 @@ def test_user_data_header(self): self.assertEqual(udh.ports.dest_port, 2948) self.assertEqual(udh.ports.orig_port, 9200) - data = to_array("0003190201") + data = hex_to_int_array("0003190201") udh = UserDataHeader.from_bytes(data) self.assertEqual(udh.concat.seq, 1) diff --git a/messaging/test/test_wap.py b/tests/test_wap.py similarity index 88% rename from messaging/test/test_wap.py rename to tests/test_wap.py index 84224a1..f941e75 100644 --- a/messaging/test/test_wap.py +++ b/tests/test_wap.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- from array import array -import unittest +from unittest import TestCase +import binascii from messaging.sms import SmsDeliver from messaging.sms.wap import (is_a_wap_push_notification as is_push, @@ -10,10 +11,10 @@ def list_to_str(l): a = array("B", l) - return a.tostring() + return a.tobytes() -class TestSmsWapPush(unittest.TestCase): +class TestSmsWapPush(TestCase): data = [1, 6, 34, 97, 112, 112, 108, 105, 99, 97, 116, 105, 111, 110, 47, 118, 110, 100, 46, 119, 97, 112, 46, 109, 109, 115, 45, @@ -39,9 +40,9 @@ def test_decoding_m_notification_ind(self): "0791447758100650400E80885810000000810004016082415464408C0C08049F8E020105040B8423F00106226170706C69636174696F6E2F766E642E7761702E6D6D732D6D65737361676500AF848C82984E4F4B3543694B636F544D595347344D4253774141734B7631344655484141414141414141008D908919802B3434373738353334323734392F545950453D504C4D4E008A808E0274008805810301194083687474703A2F", "0791447758100650440E8088581000000081000401608241547440440C08049F8E020205040B8423F02F70726F6D6D732F736572766C6574732F4E4F4B3543694B636F544D595347344D4253774141734B763134465548414141414141414100", ] - number = '3838383530313030303030303138'.decode('hex') + number = binascii.unhexlify(b'3838383530313030303030303138').decode() csca = "+447785016005" - data = "" + data = b"" sms = SmsDeliver(pdus[0]) self.assertEqual(sms.udh.concat.ref, 40846) @@ -66,7 +67,7 @@ def test_decoding_m_notification_ind(self): 'NOK5CiKcoTMYSG4MBSwAAsKv14FUHAAAAAAAA') self.assertEqual(mms.headers['MMS-Version'], '1.0') self.assertEqual(mms.headers['From'], - '2b3434373738353334323734392f545950453d504c4d4e'.decode('hex')) + binascii.unhexlify(b'2b3434373738353334323734392f545950453d504c4d4e').decode()) self.assertEqual(mms.headers['Message-Class'], 'Personal') self.assertEqual(mms.headers['Message-Size'], 29696) self.assertEqual(mms.headers['Expiry'], 72000) @@ -79,7 +80,7 @@ def test_decoding_m_notification_ind(self): ] number = "88850100000008" - data = "" + data = b"" sms = SmsDeliver(pdus[0]) self.assertEqual(sms.udh.concat.ref, 57299) @@ -103,7 +104,7 @@ def test_decoding_m_notification_ind(self): 'NOK5A1ZdFTMYSG4O3VQAAsJv94GoNAAAAAAAA') self.assertEqual(mms.headers['MMS-Version'], '1.0') self.assertEqual(mms.headers['From'], - '2b3434373731373237353034392f545950453d504c4d4e'.decode('hex')) + binascii.unhexlify(b'2b3434373731373237353034392f545950453d504c4d4e').decode()) self.assertEqual(mms.headers['Message-Class'], 'Personal') self.assertEqual(mms.headers['Message-Size'], 29696) self.assertEqual(mms.headers['Expiry'], 259199) @@ -115,9 +116,9 @@ def test_decoding_generic_wap_push(self): "0791947122725014440C8500947122921105F5112042519582408C0B05040B8423F0000396020101060B03AE81EAC3958D01A2B48403056A0A20566F6461666F6E650045C60C037761702E6D65696E63616C6C79612E64652F000801035A756D206B6F7374656E6C6F73656E20506F7274616C20224D65696E0083000322202D2065696E66616368206175662064656E20666F6C67656E64656E204C696E6B206B6C69636B656E", "0791947122725014440C8500947122921105F5112042519592403C0B05040B8423F00003960202206F6465722064696520536569746520646972656B7420617566727566656E2E2049687200830003205465616D000101", ] - number = '303034393137323232393131'.decode('hex') + number = binascii.unhexlify(b'303034393137323232393131').decode() csca = "+491722270541" - data = "" + data = b"" sms = SmsDeliver(pdus[0]) self.assertEqual(sms.udh.concat.ref, 150) @@ -134,7 +135,7 @@ def test_decoding_generic_wap_push(self): self.assertEqual(sms.number, number) data += sms.text - self.assertEqual(data, '\x01\x06\x0b\x03\xae\x81\xea\xc3\x95\x8d\x01\xa2\xb4\x84\x03\x05j\n Vodafone\x00E\xc6\x0c\x03wap.meincallya.de/\x00\x08\x01\x03Zum kostenlosen Portal "Mein\x00\x83\x00\x03" - einfach auf den folgenden Link klicken oder die Seite direkt aufrufen. Ihr\x00\x83\x00\x03 Team\x00\x01\x01') + self.assertEqual(data, b'\x01\x06\x0b\x03\xae\x81\xea\xc3\x95\x8d\x01\xa2\xb4\x84\x03\x05j\n Vodafone\x00E\xc6\x0c\x03wap.meincallya.de/\x00\x08\x01\x03Zum kostenlosen Portal "Mein\x00\x83\x00\x03" - einfach auf den folgenden Link klicken oder die Seite direkt aufrufen. Ihr\x00\x83\x00\x03 Team\x00\x01\x01') push = extract_push_notification(data) self.assertEqual(is_mms_notification(push), False)