Skip to content

Commit e9c35e6

Browse files
committed
feat(pyspark): support pyspark 4.0
BREAKING CHANGE: PySpark <3.5 is no longer supported
1 parent 3f9c950 commit e9c35e6

File tree

20 files changed

+87
-85
lines changed

20 files changed

+87
-85
lines changed

.github/workflows/ibis-backends.yml

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -571,25 +571,24 @@ jobs:
571571
matrix:
572572
include:
573573
- python-version: "3.10"
574-
pyspark-minor-version: "3.3"
574+
pyspark-minor-version: "3.5"
575+
scala-version: "2.12"
575576
tag: local
576577
deps:
577-
- pyspark==3.3.4
578-
- pandas==1.5.3
579-
- numpy==1.23.5
578+
- "'pyspark>=3.5,<4'"
580579
- python-version: "3.13"
581-
pyspark-minor-version: "3.5"
580+
pyspark-minor-version: "4.0"
581+
scala-version: "2.13"
582582
tag: local
583583
deps:
584-
- setuptools==75.1.0
585-
- delta-spark==3.3.0
584+
- delta-spark
586585
- python-version: "3.12"
587-
pyspark-minor-version: "3.5"
586+
pyspark-minor-version: "4.0"
588587
SPARK_REMOTE: "sc://localhost:15002"
588+
scala-version: "2.13"
589589
tag: remote
590590
deps:
591-
- setuptools==75.1.0
592-
- delta-spark==3.3.0
591+
- setuptools
593592
- googleapis-common-protos
594593
- grpcio
595594
- grpcio-status
@@ -622,17 +621,12 @@ jobs:
622621
- name: install uv
623622
uses: astral-sh/setup-uv@v7.2.1
624623

625-
# it requires a version of pandas that pyspark is not compatible with
626-
- name: remove lonboard
627-
if: matrix.pyspark-minor-version == '3.3'
628-
run: uv remove --group docs --no-sync lonboard
629-
630624
- name: install pyspark-specific dependencies
625+
if: matrix.deps
631626
run: uv add --no-sync ${{ join(matrix.deps, ' ') }}
632627

633628
- name: install iceberg
634-
shell: bash
635-
run: just download-iceberg-jar ${{ matrix.pyspark-minor-version }}
629+
run: just download-iceberg-jar ${{ matrix.pyspark-minor-version }} ${{ matrix.scala-version }}
636630

637631
- name: run spark connect tests
638632
if: matrix.tag == 'remote'
@@ -647,7 +641,6 @@ jobs:
647641
run: just ci-check "--extra pyspark --extra examples" -m pyspark
648642

649643
- name: check that no untracked files were produced
650-
shell: bash
651644
run: git checkout uv.lock pyproject.toml && ! git status --porcelain | grep -F .
652645

653646
- name: upload code coverage

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,3 +149,4 @@ jupyter_lite_config.json
149149
# pixi environments
150150
.pixi
151151
*.egg-info
152+
warehouse

compose.yaml

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -615,10 +615,15 @@ services:
615615
- risingwave
616616

617617
spark-connect:
618-
image: bitnamilegacy/spark:3.5.6
618+
image: bitnamilegacy/spark:4.0.0
619619
ports:
620620
- 15002:15002
621-
command: /opt/bitnami/spark/sbin/start-connect-server.sh --name ibis_testing --packages org.apache.spark:spark-connect_2.12:3.5.3,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1,io.delta:delta-spark_2.12:3.3.0
621+
command:
622+
- /opt/bitnami/spark/sbin/start-connect-server.sh
623+
- --name
624+
- ibis_testing
625+
- --packages
626+
- org.apache.spark:spark-connect_2.13:4.0.0,org.apache.iceberg:iceberg-spark-runtime-4.0_2.13:1.10.1,io.delta:delta-spark_2.13:4.0.0
622627
healthcheck:
623628
test:
624629
- CMD-SHELL

docker/spark-connect/conf.properties

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
spark.driver.extraJavaOptions=-Duser.timezone=GMT
22
spark.executor.extraJavaOptions=-Duser.timezone=GMT
3-
spark.jars.packages=org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1
43
spark.sql.catalog.local.type=hadoop
54
spark.sql.catalog.local.warehouse=warehouse
65
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog

ibis/backends/pyspark/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ class Backend(
128128
name = "pyspark"
129129
compiler = sc.pyspark.compiler
130130
temporary_example = False
131+
overwrite_example = True
131132

132133
class Options(ibis.config.Config):
133134
"""PySpark options.

ibis/backends/pyspark/datatypes.py

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111

1212
# DayTimeIntervalType introduced in Spark 3.2 (at least) but didn't show up in
1313
# PySpark until version 3.3
14-
PYSPARK_33 = vparse(pyspark.__version__) >= vparse("3.3")
1514
PYSPARK_35 = vparse(pyspark.__version__) >= vparse("3.5")
1615

1716

@@ -35,13 +34,12 @@
3534
_to_pyspark_dtypes[dt.UUID] = pt.StringType
3635

3736

38-
if PYSPARK_33:
39-
_pyspark_interval_units = {
40-
pt.DayTimeIntervalType.SECOND: "s",
41-
pt.DayTimeIntervalType.MINUTE: "m",
42-
pt.DayTimeIntervalType.HOUR: "h",
43-
pt.DayTimeIntervalType.DAY: "D",
44-
}
37+
_pyspark_interval_units = {
38+
pt.DayTimeIntervalType.SECOND: "s",
39+
pt.DayTimeIntervalType.MINUTE: "m",
40+
pt.DayTimeIntervalType.HOUR: "h",
41+
pt.DayTimeIntervalType.DAY: "D",
42+
}
4543

4644

4745
class PySparkType(TypeMapper):
@@ -62,7 +60,7 @@ def to_ibis(cls, typ, nullable=True):
6260
fields = {f.name: cls.to_ibis(f.dataType) for f in typ.fields}
6361

6462
return dt.Struct(fields, nullable=nullable)
65-
elif PYSPARK_33 and isinstance(typ, pt.DayTimeIntervalType):
63+
elif isinstance(typ, pt.DayTimeIntervalType):
6664
if (
6765
typ.startField == typ.endField
6866
and typ.startField in _pyspark_interval_units
@@ -71,7 +69,7 @@ def to_ibis(cls, typ, nullable=True):
7169
return dt.Interval(unit, nullable=nullable)
7270
else:
7371
raise com.IbisTypeError(f"{typ!r} couldn't be converted to Interval")
74-
elif PYSPARK_35 and isinstance(typ, pt.TimestampNTZType):
72+
elif isinstance(typ, pt.TimestampNTZType):
7573
return dt.Timestamp(nullable=nullable)
7674
elif isinstance(typ, pt.UserDefinedType):
7775
return cls.to_ibis(typ.sqlType(), nullable=nullable)

ibis/backends/pyspark/tests/conftest.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import pandas as pd
1515
import pytest
1616
from filelock import FileLock
17+
from packaging.version import parse as parse_version
1718

1819
import ibis
1920
from ibis import util
@@ -277,8 +278,20 @@ def parquet_dir(self) -> str:
277278

278279
@staticmethod
279280
def connect(*, tmpdir, worker_id, **kw): # noqa: ARG004
281+
import pyspark
280282
from pyspark.sql import SparkSession
281283

284+
spark_ver = parse_version(pyspark.__version__)
285+
286+
(scala_jarfile,) = (
287+
Path(pyspark.__file__)
288+
.parent.joinpath("jars")
289+
.glob("scala-library-*.jar")
290+
)
291+
_, scala_version_str = scala_jarfile.stem.rsplit("-", 1)
292+
scala_ver = parse_version(scala_version_str)
293+
iceberg_jar = f"iceberg-spark-runtime-{spark_ver.major}.{spark_ver.minor}_{scala_ver.major}.{scala_ver.minor}:1.10.1"
294+
282295
config = reduce(
283296
lambda config, line: config.config(
284297
*map(str.strip, line.strip().split("=", 1))
@@ -306,6 +319,7 @@ def connect(*, tmpdir, worker_id, **kw): # noqa: ARG004
306319
"spark.sql.execution.arrow.pyspark.enabled=false",
307320
"spark.sql.shuffle.partitions=1",
308321
"spark.storage.blockManagerSlaveTimeoutMs=4200s",
322+
f"spark.jars.packages=org.apache.iceberg:{iceberg_jar}",
309323
],
310324
),
311325
SparkSession.builder.appName("ibis_testing"),

ibis/backends/pyspark/tests/test_client.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
11
from __future__ import annotations
22

3-
import pytest
4-
53
import ibis
64
from ibis.util import gen_name
75

86

9-
@pytest.mark.xfail_version(pyspark=["pyspark<3.4"], reason="no catalog support")
107
def test_catalog_db_args(con):
118
t = ibis.memtable({"epoch": [1712848119, 1712848121, 1712848155]})
129

@@ -61,7 +58,6 @@ def test_create_table_no_catalog(con):
6158
assert con.current_database != "default"
6259

6360

64-
@pytest.mark.xfail_version(pyspark=["pyspark<3.4"], reason="no catalog support")
6561
def test_create_table_with_partition_and_catalog(con):
6662
# Create a sample table with a partition column
6763
data = {

ibis/backends/sql/compilers/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -899,7 +899,7 @@ def visit_Round(self, op, *, arg, digits):
899899
### Dtype Dysmorphia
900900

901901
def visit_TryCast(self, op, *, arg, to):
902-
return sge.TryCast(this=arg, to=self.type_mapper.from_ibis(to))
902+
return sge.TryCast(this=arg, to=self.type_mapper.from_ibis(to), safe=True)
903903

904904
### Comparator Conundrums
905905

ibis/backends/tests/errors.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@
5252
from pyspark.errors.exceptions.base import (
5353
ArithmeticException as PySparkArithmeticException,
5454
)
55+
from pyspark.errors.exceptions.base import (
56+
NumberFormatException as PySparkNumberFormatException,
57+
)
5558
from pyspark.errors.exceptions.base import ParseException as PySparkParseException
5659
from pyspark.errors.exceptions.base import PySparkValueError
5760
from pyspark.errors.exceptions.base import PythonException as PySparkPythonException
@@ -66,7 +69,7 @@
6669
PySparkPythonException
6770
) = PySparkUnsupportedOperationException = PySparkConnectGrpcException = (
6871
PySparkValueError
69-
) = None
72+
) = PySparkNumberFormatException = None
7073

7174
try:
7275
from google.api_core.exceptions import BadRequest as GoogleBadRequest

0 commit comments

Comments
 (0)