Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PYTHON-1350 Store IV along with encrypted text when using column-level encryption #1160

Merged
merged 6 commits into from
Jun 2, 2023
40 changes: 24 additions & 16 deletions cassandra/policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -1257,15 +1257,22 @@ def encode_and_encrypt(self, coldesc, obj):

class AES256ColumnEncryptionPolicy(ColumnEncryptionPolicy):

# CBC uses an IV that's the same size as the block size
#
# TODO: Need to find some way to expose mode options
# (CBC etc.) without leaking classes from the underlying
# impl here
def __init__(self, mode = modes.CBC, iv = os.urandom(AES256_BLOCK_SIZE_BYTES)):

self.mode = mode
# Fix block cipher mode for now. IV size is a function of block cipher used
# so fixing this avoids (possibly unnecessary) validation logic here.
mode = modes.CBC
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A change from the prior implementation: we fix the block cipher mode now. Doing so allows us to make clear assumptions about the expected size of the IV. We certainly can get around this other ways, but honestly the way we were previously exposing the block cipher mode wasn't great; it leaked the implementation badly since users who wanted to supply a non-default value had to do so via types from the cryptography module.

We can re-introduce the idea of making this configurable if this becomes an issue, but for an initial pass it seemed reasonably sane to constrain this in order to support ease of use. I'm certainly open to counter-arguments, though.


def __init__(self, iv=None):

# CBC uses an IV that's the same size as the block size
#
# Avoid defining IV with a default arg in order to stay away from
# any issues around the caching of default args
self.iv = iv
if self.iv:
if not len(self.iv) == AES256_BLOCK_SIZE_BYTES:
raise ValueError("AES256 column encryption policy uses CBC mode and therefore expects a 128-bit initialization vector")
else:
self.iv = os.urandom(AES256_BLOCK_SIZE_BYTES)

# ColData for a given ColDesc is always preserved. We only create a Cipher
# when there's an actual need to for a given ColDesc
Expand All @@ -1286,11 +1293,13 @@ def encrypt(self, coldesc, obj_bytes):

cipher = self._get_cipher(coldesc)
encryptor = cipher.encryptor()
return encryptor.update(padded_bytes) + encryptor.finalize()
return self.iv + encryptor.update(padded_bytes) + encryptor.finalize()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it should include the size of the IV as a prefix to the IV?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an excellent idea. We don't need it for this impl given that we're only supporting AES-256 and the IV for that cipher is a fixed size... but yeah, other impls will have different requirements. I'll add that in... nice catch!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After looking at this again I'm very much on the fence about whether we need to include the IV size here.

The IV size is a function of the block chaining mode and all modes supported by the current version of cryptography (with one exception) use an IV size that's equal to the block size of the cipher. We're in the impl for AES-256 so we know the block size will be 16 bytes. We might wind up with other policy impls based on other ciphers but with respect to the internals of this policy (which is the scope we're dealing with here) 16 bytes seems like a reasonable assumption. Only way that breaks down would seem to be one of the following:

  • We somehow make the mode configurable again and don't exclude GCM
  • We somehow make the mode configurable again and cryptography subsequently releases new versions which add some number of new modes which make use of IVs of different sizes and we don't exclude the new modes

These scenarios are not implausible necessarily, but I guess the question is whether we want to add a byte or two [1] to every encrypted value to future-proof them against changes like these. On the one hand we just added 16 bytes to each of these values to preserve the IV so a few more bytes doesn't seem like a big jump. On the other hand we could add flexibility while still guarding against the failure cases above by being more cautious if and when we make the mode configurable. For example, just white-listing the modes which use block-size IVs. We could expand that to, say, all modes that support an IV of 256 bytes or less if we want to add an additional byte for IV length but that set doesn't differ much from the set of modes that support an IV equal to the block size.

Thoughts?

[1] We'd actually need a lot more than this if we wanted full coverage; GCM can support IVs up to 2^64 - 1 bits

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're in the impl for AES-256 so we know the block size will be 16 bytes.

The only reason I can think of to do it is to fail fast if it's wrong, which means someone encrypted the data using a different class than AES-256. So maybe it is not worth adding bytes only to detect misconfigurations. I mean the users is going to get garbage out, or a decryption error out, if they did use some other function to encrypt the data. So maybe that is enough to clue them in their configuration is wrong.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm inclined to agree, I don't think using length as an indicator of whether we used the same impl to encrypt the data works. AES is a great example. AES-256 uses a 16 byte block size, but so does AES-128. A 1 byte IV size value would thus be reasonable for both (as discussed above) so it wouldn't give us any indicator as to whether we should use one or the other for a given blob.

There's perhaps a separate question there about whether we should provide some additional flags there, maybe a magic number of some kind to indicate which algorithm a given blob was encoded with. But since we have exactly one column encryption policy impl at the moment I'm not sure we know enough to pick the right design.

So I think we're saying go with what's in the PR now (IV in the blob, no leading IV size byte). Sound right to you @JeremiahDJordan ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah


def decrypt(self, coldesc, encrypted_bytes):
def decrypt(self, coldesc, bytes):

cipher = self._get_cipher(coldesc)
iv = bytes[:AES256_BLOCK_SIZE_BYTES]
encrypted_bytes = bytes[AES256_BLOCK_SIZE_BYTES:]
cipher = self._get_cipher(coldesc, iv=iv)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth being explicit about this: the IV supplied via the policy constructor is never used for decrypted read values. We only use that when writing new data. Seems obvious given how the code is organized but may not be obvious to users... perhaps we should more explicitly call this out?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be fine.

decryptor = cipher.decryptor()
padded_bytes = decryptor.update(encrypted_bytes) + decryptor.finalize()

Expand Down Expand Up @@ -1330,19 +1339,18 @@ def cache_info(self):
def column_type(self, coldesc):
return self.coldata[coldesc].type

def _get_cipher(self, coldesc):
def _get_cipher(self, coldesc, iv=None):
"""
Access relevant state from this instance necessary to create a Cipher and then get one,
hopefully returning a cached instance if we've already done so (and it hasn't been evicted)
"""

try:
coldata = self.coldata[coldesc]
return AES256ColumnEncryptionPolicy._build_cipher(coldata.key, self.mode, self.iv)
return AES256ColumnEncryptionPolicy._build_cipher(coldata.key, iv or self.iv)
except KeyError:
raise ValueError("Could not find column {}".format(coldesc))

# Explicitly use a class method here to avoid caching self
@lru_cache(maxsize=128)
def _build_cipher(key, mode, iv):
return Cipher(algorithms.AES256(key), mode(iv))
def _build_cipher(key, iv):
return Cipher(algorithms.AES256(key), AES256ColumnEncryptionPolicy.mode(iv))
71 changes: 57 additions & 14 deletions tests/integration/standard/test_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@

from decimal import Decimal
import os
import random
import unittest

from cassandra.cluster import ExecutionProfile, EXEC_PROFILE_DEFAULT
from cassandra.policies import HostFilterPolicy, RoundRobinPolicy, SimpleConvictionPolicy, \
WhiteListRoundRobinPolicy, ColDesc, AES256ColumnEncryptionPolicy, AES256_KEY_SIZE_BYTES
WhiteListRoundRobinPolicy, ColDesc, AES256ColumnEncryptionPolicy, AES256_KEY_SIZE_BYTES, \
AES256_BLOCK_SIZE_BYTES
from cassandra.pool import Host
from cassandra.connection import DefaultEndPoint

Expand Down Expand Up @@ -101,25 +101,29 @@ def _recreate_keyspace(self, session):
session.execute("CREATE KEYSPACE foo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}")
session.execute("CREATE TABLE foo.bar(encrypted blob, unencrypted int, primary key(unencrypted))")

def _create_policy(self, key, iv = None):
cl_policy = AES256ColumnEncryptionPolicy()
col_desc = ColDesc('foo','bar','encrypted')
cl_policy.add_column(col_desc, key, "int")
return (col_desc, cl_policy)

def test_end_to_end_prepared(self):

# We only currently perform testing on a single type/expected value pair since CLE functionality is essentially
# independent of the underlying type. We intercept data after it's been encoded when it's going out and before it's
# encoded when coming back; the actual types of the data involved don't impact us.
expected = 12345
expected_type = "int"
expected = 0

key = os.urandom(AES256_KEY_SIZE_BYTES)
cl_policy = AES256ColumnEncryptionPolicy()
col_desc = ColDesc('foo','bar','encrypted')
cl_policy.add_column(col_desc, key, expected_type)

(_, cl_policy) = self._create_policy(key)
cluster = TestCluster(column_encryption_policy=cl_policy)
session = cluster.connect()
self._recreate_keyspace(session)

prepared = session.prepare("insert into foo.bar (encrypted, unencrypted) values (?,?)")
session.execute(prepared, (expected,expected))
for i in range(100):
session.execute(prepared, (i,i))

# A straight select from the database will now return the decrypted bits. We select both encrypted and unencrypted
# values here to confirm that we don't interfere with regular processing of unencrypted vals.
Expand All @@ -135,20 +139,20 @@ def test_end_to_end_prepared(self):

def test_end_to_end_simple(self):

expected = 67890
expected_type = "int"
expected = 1

key = os.urandom(AES256_KEY_SIZE_BYTES)
cl_policy = AES256ColumnEncryptionPolicy()
col_desc = ColDesc('foo','bar','encrypted')
cl_policy.add_column(col_desc, key, expected_type)

(col_desc, cl_policy) = self._create_policy(key)
cluster = TestCluster(column_encryption_policy=cl_policy)
session = cluster.connect()
self._recreate_keyspace(session)

# Use encode_and_encrypt helper function to populate date
session.execute("insert into foo.bar (encrypted, unencrypted) values (%s,%s)",(cl_policy.encode_and_encrypt(col_desc, expected), expected))
for i in range(1,100):
self.assertIsNotNone(i)
encrypted = cl_policy.encode_and_encrypt(col_desc, i)
session.execute("insert into foo.bar (encrypted, unencrypted) values (%s,%s)", (encrypted, i))

# A straight select from the database will now return the decrypted bits. We select both encrypted and unencrypted
# values here to confirm that we don't interfere with regular processing of unencrypted vals.
Expand All @@ -161,3 +165,42 @@ def test_end_to_end_simple(self):
(encrypted,unencrypted) = session.execute(prepared, [expected]).one()
self.assertEquals(expected, encrypted)
self.assertEquals(expected, unencrypted)

def test_end_to_end_different_cle_contexts(self):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When run manually this test fails (with the expected errors) without the rest of the changes in this PR and passes with them.


expected = 2

key = os.urandom(AES256_KEY_SIZE_BYTES)

# Simulate the creation of two AES256 policies at two different times. Python caches
# default param args at function definition time so a single value will be used any time
# the default val is used. Upshot is that within the same test we'll always have the same
# IV if we rely on the default args, so manually introduce some variation here to simulate
# what actually happens if you have two distinct sessions created at two different times.
iv1 = os.urandom(AES256_BLOCK_SIZE_BYTES)
(col_desc1, cl_policy1) = self._create_policy(key, iv=iv1)
cluster1 = TestCluster(column_encryption_policy=cl_policy1)
session1 = cluster1.connect()
self._recreate_keyspace(session1)

# Use encode_and_encrypt helper function to populate date
for i in range(1,100):
self.assertIsNotNone(i)
encrypted = cl_policy1.encode_and_encrypt(col_desc1, i)
session1.execute("insert into foo.bar (encrypted, unencrypted) values (%s,%s)", (encrypted, i))
session1.shutdown()
cluster1.shutdown()

# Explicitly clear the class-level cache here; we're trying to simulate a second connection from a completely new process and
# that would entail not re-using any cached ciphers
AES256ColumnEncryptionPolicy._build_cipher.cache_clear()
cache_info = cl_policy1.cache_info()
self.assertEqual(cache_info.currsize, 0)

iv2 = os.urandom(AES256_BLOCK_SIZE_BYTES)
(_, cl_policy2) = self._create_policy(key, iv=iv2)
cluster2 = TestCluster(column_encryption_policy=cl_policy2)
session2 = cluster2.connect()
(encrypted,unencrypted) = session2.execute("select encrypted, unencrypted from foo.bar where unencrypted = %s allow filtering", (expected,)).one()
self.assertEquals(expected, encrypted)
self.assertEquals(expected, unencrypted)
21 changes: 21 additions & 0 deletions tests/unit/test_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -1540,6 +1540,23 @@ def test_add_column_invalid_key_size_raises(self):
with self.assertRaises(ValueError):
policy.add_column(coldesc, os.urandom(key_size), "blob")

def test_add_column_invalid_iv_size_raises(self):
def test_iv_size(iv_size):
policy = AES256ColumnEncryptionPolicy(iv = os.urandom(iv_size))
policy.add_column(coldesc, os.urandom(AES256_KEY_SIZE_BYTES), "blob")
policy.encrypt(coldesc, os.urandom(128))

coldesc = ColDesc('ks1','table1','col1')
for iv_size in range(1,AES256_BLOCK_SIZE_BYTES - 1):
with self.assertRaises(ValueError):
test_iv_size(iv_size)
for iv_size in range(AES256_BLOCK_SIZE_BYTES + 1,(2 * AES256_BLOCK_SIZE_BYTES) - 1):
with self.assertRaises(ValueError):
test_iv_size(iv_size)

# Finally, confirm that the expected IV size has no issue
test_iv_size(AES256_BLOCK_SIZE_BYTES)

def test_add_column_null_coldesc_raises(self):
with self.assertRaises(ValueError):
policy = AES256ColumnEncryptionPolicy()
Expand Down Expand Up @@ -1610,6 +1627,10 @@ def test_decrypt_unknown_column(self):
policy.decrypt(ColDesc('ks2','table2','col2'), encrypted_bytes)

def test_cache_info(self):

# Exclude any interference from tests above
AES256ColumnEncryptionPolicy._build_cipher.cache_clear()

coldesc1 = ColDesc('ks1','table1','col1')
coldesc2 = ColDesc('ks2','table2','col2')
coldesc3 = ColDesc('ks3','table3','col3')
Expand Down