Skip to content

Support ADLS with Pyarrow file IO #2111

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
24 changes: 14 additions & 10 deletions mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,16 +146,20 @@ For the FileIO there are several configuration options available:

<!-- markdown-link-check-disable -->

| Key | Example | Description |
| ---------------------- | ----------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| adls.connection-string | AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqF...;BlobEndpoint=<http://localhost/> | A [connection string](https://learn.microsoft.com/en-us/azure/storage/common/storage-configure-connection-string). This could be used to use FileIO with any adls-compatible object storage service that has a different endpoint (like [azurite](https://github.com/azure/azurite)). |
| adls.account-name | devstoreaccount1 | The account that you want to connect to |
| adls.account-key | Eby8vdM02xNOcqF... | The key to authentication against the account. |
| adls.sas-token | NuHOuuzdQN7VRM%2FOpOeqBlawRCA845IY05h9eu1Yte4%3D | The shared access signature |
| adls.tenant-id | ad667be4-b811-11ed-afa1-0242ac120002 | The tenant-id |
| adls.client-id | ad667be4-b811-11ed-afa1-0242ac120002 | The client-id |
| adls.client-secret | oCA3R6P\*ka#oa1Sms2J74z... | The client-secret |
| adls.account-host | accountname1.blob.core.windows.net | The storage account host. See [AzureBlobFileSystem](https://github.com/fsspec/adlfs/blob/adb9c53b74a0d420625b86dd00fbe615b43201d2/adlfs/spec.py#L125) for reference |
| Key | Example | Description |
|------------------------------|---------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| adls.connection-string | AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqF...;BlobEndpoint=<http://localhost/> | A [connection string](https://learn.microsoft.com/en-us/azure/storage/common/storage-configure-connection-string). This could be used to use FileIO with any adls-compatible object storage service that has a different endpoint (like [azurite](https://github.com/azure/azurite)). |
| adls.account-name | devstoreaccount1 | The account that you want to connect to |
| adls.account-key | Eby8vdM02xNOcqF... | The key to authentication against the account. |
| adls.sas-token | NuHOuuzdQN7VRM%2FOpOeqBlawRCA845IY05h9eu1Yte4%3D | The shared access signature |
| adls.tenant-id | ad667be4-b811-11ed-afa1-0242ac120002 | The tenant-id |
| adls.client-id | ad667be4-b811-11ed-afa1-0242ac120002 | The client-id |
| adls.client-secret | oCA3R6P\*ka#oa1Sms2J74z... | The client-secret |
| adls.account-host | accountname1.blob.core.windows.net | The storage account host. See [AzureBlobFileSystem](https://github.com/fsspec/adlfs/blob/adb9c53b74a0d420625b86dd00fbe615b43201d2/adlfs/spec.py#L125) for reference |
| adls.blob-storage-authority | .blob.core.windows.net | The hostname[:port] of the Blob Service. Defaults to `.blob.core.windows.net`. Useful for connecting to a local emulator, like [azurite](https://github.com/azure/azurite). See [AzureFileSystem](https://arrow.apache.org/docs/python/filesystems.html#azure-storage-file-system) for reference |
| adls.dfs-storage-authority | .dfs.core.windows.net | The hostname[:port] of the Data Lake Gen 2 Service. Defaults to `.dfs.core.windows.net`. Useful for connecting to a local emulator, like [azurite](https://github.com/azure/azurite). See [AzureFileSystem](https://arrow.apache.org/docs/python/filesystems.html#azure-storage-file-system) for reference |
| adls.blob-storage-scheme | https | Either `http` or `https`. Defaults to `https`. Useful for connecting to a local emulator, like [azurite](https://github.com/azure/azurite). See [AzureFileSystem](https://arrow.apache.org/docs/python/filesystems.html#azure-storage-file-system) for reference |
| adls.dfs-storage-scheme | https | Either `http` or `https`. Defaults to `https`. Useful for connecting to a local emulator, like [azurite](https://github.com/azure/azurite). See [AzureFileSystem](https://arrow.apache.org/docs/python/filesystems.html#azure-storage-file-system) for reference |

<!-- markdown-link-check-enable-->

Expand Down
4 changes: 4 additions & 0 deletions pyiceberg/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@
ADLS_CLIENT_ID = "adls.client-id"
ADLS_CLIENT_SECRET = "adls.client-secret"
ADLS_ACCOUNT_HOST = "adls.account-host"
ADLS_BLOB_STORAGE_AUTHORITY = "adls.blob-storage-authority"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

ADLS_DFS_STORAGE_AUTHORITY = "adls.dfs-storage-authority"
ADLS_BLOB_STORAGE_SCHEME = "adls.blob-storage-scheme"
ADLS_DFS_STORAGE_SCHEME = "adls.dfs-storage-scheme"
GCS_TOKEN = "gcs.oauth2.token"
GCS_TOKEN_EXPIRES_AT_MS = "gcs.oauth2.token-expires-at"
GCS_PROJECT_ID = "gcs.project-id"
Expand Down
47 changes: 47 additions & 0 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@
)
from pyiceberg.expressions.visitors import visit as boolean_expression_visit
from pyiceberg.io import (
ADLS_ACCOUNT_KEY,
ADLS_ACCOUNT_NAME,
ADLS_BLOB_STORAGE_AUTHORITY,
ADLS_BLOB_STORAGE_SCHEME,
ADLS_DFS_STORAGE_AUTHORITY,
ADLS_DFS_STORAGE_SCHEME,
ADLS_SAS_TOKEN,
AWS_ACCESS_KEY_ID,
AWS_REGION,
AWS_ROLE_ARN,
Expand Down Expand Up @@ -394,6 +401,9 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste
elif scheme in {"gs", "gcs"}:
return self._initialize_gcs_fs()

elif scheme in {"abfs", "abfss", "wasb", "wasbs"}:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cant find any pyarrow docs that indicates wasb and wasbs is supported.

return self._initialize_azure_fs()

elif scheme in {"file"}:
return self._initialize_local_fs()

Expand Down Expand Up @@ -475,6 +485,43 @@ def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem:

return S3FileSystem(**client_kwargs)

def _initialize_azure_fs(self) -> FileSystem:
from packaging import version

MIN_PYARROW_VERSION_SUPPORTING_AZURE_FS = "20.0.0"
if version.parse(pyarrow.__version__) < version.parse(MIN_PYARROW_VERSION_SUPPORTING_AZURE_FS):
raise ImportError(
f"pyarrow version >= {MIN_PYARROW_VERSION_SUPPORTING_AZURE_FS} required for AzureFileSystem support, "
f"but found version {pyarrow.__version__}."
)

from pyarrow.fs import AzureFileSystem

client_kwargs: Dict[str, str] = {}

if account_name := self.properties.get(ADLS_ACCOUNT_NAME):
client_kwargs["account_name"] = account_name

if account_key := self.properties.get(ADLS_ACCOUNT_KEY):
client_kwargs["account_key"] = account_key

if blob_storage_authority := self.properties.get(ADLS_BLOB_STORAGE_AUTHORITY):
client_kwargs["blob_storage_authority"] = blob_storage_authority

if dfs_storage_authority := self.properties.get(ADLS_DFS_STORAGE_AUTHORITY):
client_kwargs["dfs_storage_authority"] = dfs_storage_authority

if blob_storage_scheme := self.properties.get(ADLS_BLOB_STORAGE_SCHEME):
client_kwargs["blob_storage_scheme"] = blob_storage_scheme

if dfs_storage_scheme := self.properties.get(ADLS_DFS_STORAGE_SCHEME):
client_kwargs["dfs_storage_scheme"] = dfs_storage_scheme

if sas_token := self.properties.get(ADLS_SAS_TOKEN):
client_kwargs["sas_token"] = sas_token

return AzureFileSystem(**client_kwargs)

def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem:
from pyarrow.fs import HadoopFileSystem

Expand Down
79 changes: 59 additions & 20 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@
from pyiceberg.catalog.noop import NoopCatalog
from pyiceberg.expressions import BoundReference
from pyiceberg.io import (
ADLS_ACCOUNT_KEY,
ADLS_ACCOUNT_NAME,
ADLS_BLOB_STORAGE_AUTHORITY,
ADLS_BLOB_STORAGE_SCHEME,
ADLS_DFS_STORAGE_AUTHORITY,
ADLS_DFS_STORAGE_SCHEME,
GCS_PROJECT_ID,
GCS_SERVICE_HOST,
GCS_TOKEN,
Expand Down Expand Up @@ -348,6 +354,11 @@ def table_schema_with_all_types() -> Schema:
)


@pytest.fixture(params=["abfs", "abfss", "wasb", "wasbs"])
def adls_scheme(request: pytest.FixtureRequest) -> str:
return request.param


@pytest.fixture(scope="session")
def pyarrow_schema_simple_without_ids() -> "pa.Schema":
import pyarrow as pa
Expand Down Expand Up @@ -2088,6 +2099,26 @@ def fsspec_fileio_gcs(request: pytest.FixtureRequest) -> FsspecFileIO:
return fsspec.FsspecFileIO(properties=properties)


@pytest.fixture
def adls_fsspec_fileio(request: pytest.FixtureRequest) -> Generator[FsspecFileIO, None, None]:
from azure.storage.blob import BlobServiceClient

azurite_url = request.config.getoption("--adls.endpoint")
azurite_account_name = request.config.getoption("--adls.account-name")
azurite_account_key = request.config.getoption("--adls.account-key")
azurite_connection_string = f"DefaultEndpointsProtocol=http;AccountName={azurite_account_name};AccountKey={azurite_account_key};BlobEndpoint={azurite_url}/{azurite_account_name};"
properties = {
"adls.connection-string": azurite_connection_string,
"adls.account-name": azurite_account_name,
}

bbs = BlobServiceClient.from_connection_string(conn_str=azurite_connection_string)
bbs.create_container("tests")
yield fsspec.FsspecFileIO(properties=properties)
bbs.delete_container("tests")
bbs.close()


@pytest.fixture
def pyarrow_fileio_gcs(request: pytest.FixtureRequest) -> "PyArrowFileIO":
from pyiceberg.io.pyarrow import PyArrowFileIO
Expand All @@ -2101,6 +2132,34 @@ def pyarrow_fileio_gcs(request: pytest.FixtureRequest) -> "PyArrowFileIO":
return PyArrowFileIO(properties=properties)


@pytest.fixture
def pyarrow_fileio_adls(request: pytest.FixtureRequest) -> Generator[Any, None, None]:
from azure.storage.blob import BlobServiceClient

from pyiceberg.io.pyarrow import PyArrowFileIO

azurite_url = request.config.getoption("--adls.endpoint")
azurite_scheme, azurite_authority = azurite_url.split("://", 1)

azurite_account_name = request.config.getoption("--adls.account-name")
azurite_account_key = request.config.getoption("--adls.account-key")
azurite_connection_string = f"DefaultEndpointsProtocol=http;AccountName={azurite_account_name};AccountKey={azurite_account_key};BlobEndpoint={azurite_url}/{azurite_account_name};"
properties = {
ADLS_ACCOUNT_NAME: azurite_account_name,
ADLS_ACCOUNT_KEY: azurite_account_key,
ADLS_BLOB_STORAGE_AUTHORITY: azurite_authority,
ADLS_DFS_STORAGE_AUTHORITY: azurite_authority,
ADLS_BLOB_STORAGE_SCHEME: azurite_scheme,
ADLS_DFS_STORAGE_SCHEME: azurite_scheme,
}

bbs = BlobServiceClient.from_connection_string(conn_str=azurite_connection_string)
bbs.create_container("warehouse")
yield PyArrowFileIO(properties=properties)
bbs.delete_container("warehouse")
bbs.close()


def aws_credentials() -> None:
os.environ["AWS_ACCESS_KEY_ID"] = "testing"
os.environ["AWS_SECRET_ACCESS_KEY"] = "testing"
Expand Down Expand Up @@ -2162,26 +2221,6 @@ def fixture_dynamodb(_aws_credentials: None) -> Generator[boto3.client, None, No
yield boto3.client("dynamodb", region_name="us-east-1")


@pytest.fixture
def adls_fsspec_fileio(request: pytest.FixtureRequest) -> Generator[FsspecFileIO, None, None]:
from azure.storage.blob import BlobServiceClient

azurite_url = request.config.getoption("--adls.endpoint")
azurite_account_name = request.config.getoption("--adls.account-name")
azurite_account_key = request.config.getoption("--adls.account-key")
azurite_connection_string = f"DefaultEndpointsProtocol=http;AccountName={azurite_account_name};AccountKey={azurite_account_key};BlobEndpoint={azurite_url}/{azurite_account_name};"
properties = {
"adls.connection-string": azurite_connection_string,
"adls.account-name": azurite_account_name,
}

bbs = BlobServiceClient.from_connection_string(conn_str=azurite_connection_string)
bbs.create_container("tests")
yield fsspec.FsspecFileIO(properties=properties)
bbs.delete_container("tests")
bbs.close()


@pytest.fixture(scope="session")
def empty_home_dir_path(tmp_path_factory: pytest.TempPathFactory) -> str:
home_path = str(tmp_path_factory.mktemp("home"))
Expand Down
Loading