Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes and bug fixes to support shared clusters in DBR 14.2 #248

Merged
merged 64 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
aebbb06
wip
ronanstokes-db Mar 26, 2023
5f0ffc0
merge from origin
ronanstokes-db Mar 27, 2023
1eda552
wip
ronanstokes-db Apr 7, 2023
7de014c
Merge branch 'master' of https://github.com/databrickslabs/dbldatagen
ronanstokes-db Apr 7, 2023
c859475
Merge branch 'master' of https://github.com/databrickslabs/dbldatagen
ronanstokes-db Apr 9, 2023
3094e96
Merge branch 'master' of https://github.com/databrickslabs/dbldatagen
ronanstokes-db Apr 13, 2023
3bf6e9b
Merge branch 'master' of https://github.com/databrickslabs/dbldatagen
ronanstokes-db Apr 17, 2023
caaff18
Merge branch 'master' of https://github.com/databrickslabs/dbldatagen
ronanstokes-db Apr 18, 2023
87d5c50
Merge branch 'master' of https://github.com/databrickslabs/dbldatagen
ronanstokes-db Apr 18, 2023
4536794
Merge branch 'master' of https://github.com/databrickslabs/dbldatagen
ronanstokes-db Apr 19, 2023
eba6193
Merge branch 'master' of https://github.com/databrickslabs/dbldatagen
ronanstokes-db Apr 21, 2023
c4fdc3b
wip
ronanstokes-db May 9, 2023
8734b19
Merge branch 'master' of https://github.com/databrickslabs/dbldatagen
ronanstokes-db May 30, 2023
f063235
Merge branch 'master' of https://github.com/databrickslabs/dbldatagen
ronanstokes-db Jun 28, 2023
b9fb552
Merge branch 'master' of https://github.com/databrickslabs/dbldatagen
ronanstokes-db Jul 1, 2023
3eb15f4
Merge branch 'master' of https://github.com/databrickslabs/dbldatagen
ronanstokes-db Jul 11, 2023
c85f915
Merge branch 'master' of https://github.com/databrickslabs/dbldatagen
ronanstokes-db Jul 13, 2023
e53f8fe
changes for release
ronanstokes-db Jul 13, 2023
4259cac
Merge branch 'master' of https://github.com/databrickslabs/dbldatagen
ronanstokes-db Jul 21, 2023
fef24fa
example notebook
ronanstokes-db Oct 3, 2023
35e2de4
Merge branch 'master' of https://github.com/databrickslabs/dbldatagen
ronanstokes-db Oct 5, 2023
ce1a12d
Merge branch 'master' of https://github.com/databrickslabs/dbldatagen
ronanstokes-db Oct 5, 2023
63c8e70
Merge branch 'master' of https://github.com/databrickslabs/dbldatagen
ronanstokes-db Dec 13, 2023
3ac33c0
updates to handle shared spark session restrictions
ronanstokes-db Dec 18, 2023
c5036a6
updates to handle shared sparkSession
ronanstokes-db Dec 18, 2023
e735643
updates to handle shared sparkSession
ronanstokes-db Dec 18, 2023
ce83fe8
updates to handle shared sparkSession
ronanstokes-db Dec 18, 2023
81310b5
updates to handle shared sparkSession
ronanstokes-db Dec 18, 2023
842bf7a
updates to handle shared sparkSession
ronanstokes-db Dec 18, 2023
4da31ac
Merge branch 'master' into feature_shared_support
ronanstokes-db Dec 18, 2023
9513736
updates to handle shared sparkSession
ronanstokes-db Dec 18, 2023
080778c
Merge branch 'feature_shared_support' of https://github.com/databrick…
ronanstokes-db Dec 18, 2023
aff2310
updates to handle shared sparkSession
ronanstokes-db Dec 18, 2023
dc49472
updates to handle shared sparkSession
ronanstokes-db Dec 18, 2023
24087d3
updates to handle shared sparkSession
ronanstokes-db Dec 18, 2023
090b124
updates to handle shared sparkSession
ronanstokes-db Dec 18, 2023
65aae5a
changes per code review
ronanstokes-db Jan 16, 2024
1c418f0
Doc updates 032223 (#180)
ronanstokes-db Mar 25, 2023
0eec26a
Feature v34 (#201)
ronanstokes-db Apr 7, 2023
240ef41
Feature generate from existing data (#163)
ronanstokes-db Apr 9, 2023
383da23
Remove calls to root logger. (#205)
MarvinSchenkel Apr 13, 2023
c95a995
Release v34post1 (#206)
ronanstokes-db Apr 13, 2023
9b48095
Fix doc typos and minor clarification (#207)
ronanstokes-db Apr 18, 2023
4ce97cd
Feature issue 209 (#210)
ronanstokes-db Apr 18, 2023
571e4f2
Release 0v34post2 (#211)
ronanstokes-db Apr 19, 2023
318fcfc
Feature html formatting (#208)
ronanstokes-db Apr 20, 2023
f935c63
wip
ronanstokes-db May 9, 2023
c01baa3
Build fixes (#213)
ronanstokes-db May 9, 2023
bc2bd57
Feature doc change generating text (#218)
ronanstokes-db Jun 15, 2023
b2e09ca
Feature build update (#220)
ronanstokes-db Jun 28, 2023
9b0847b
Feature struct changes (#219)
ronanstokes-db Jul 8, 2023
1dc6606
Feature additional docs (#222)
ronanstokes-db Jul 12, 2023
0de8665
changes for release
ronanstokes-db Jul 13, 2023
7dca740
example notebook
ronanstokes-db Oct 3, 2023
fe6865e
Feature readme updates - updates readme to note compatible Unity Cata…
ronanstokes-db Oct 5, 2023
23d5db9
Feature add codeowners (#238)
ronanstokes-db Oct 5, 2023
aa772f0
Test assign (#239)
nfx Nov 22, 2023
c830d20
updates to handle shared spark session restrictions
ronanstokes-db Dec 18, 2023
a482861
Update LICENSE (#246)
nfx Dec 18, 2023
ed989ed
updates to handle shared sparkSession
ronanstokes-db Dec 18, 2023
a59df0e
changes per code review
ronanstokes-db Jan 16, 2024
3c55687
Merge branch 'feature_shared_support' of https://github.com/databrick…
ronanstokes-db Feb 17, 2024
64ed5de
Merge branch 'master' of https://github.com/databrickslabs/dbldatagen
ronanstokes-db Feb 17, 2024
22925fd
Merge branch 'master' into feature_shared_support
ronanstokes-db Feb 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
## Change History
All notable changes to the Databricks Labs Data Generator will be documented in this file.



#### Changed
* Updated readme to include details on which versions of Databricks runtime support Unity Catalog `shared` access mode.
* Updated code to use default parallelism of 200 when using a shared Spark session
* Updated code to use Spark's SQL function `element_at` instead of array indexing due to incompatibility


### Version 0.3.5
Expand Down
2 changes: 1 addition & 1 deletion dbldatagen/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from .data_generator import DataGenerator
from .datagen_constants import DEFAULT_RANDOM_SEED, RANDOM_SEED_RANDOM, RANDOM_SEED_FIXED, \
RANDOM_SEED_HASH_FIELD_NAME, MIN_PYTHON_VERSION, MIN_SPARK_VERSION, \
INFER_DATATYPE
INFER_DATATYPE, SPARK_DEFAULT_PARALLELISM
from .utils import ensure, topologicalSort, mkBoundsList, coalesce_values, \
deprecated, parse_time_interval, DataGenError, split_list_matching_condition, strip_margins, \
json_value_from_path, system_time_millis
Expand Down
2 changes: 1 addition & 1 deletion dbldatagen/column_generation_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -1085,7 +1085,7 @@ def _makeSingleGenerationExpression(self, index=None, use_pandas_optimizations=T
.astype(self.datatype))

if self.values is not None:
new_def = array([lit(x) for x in self.values])[new_def.astype(IntegerType())]
new_def = F.element_at(F.array([F.lit(x) for x in self.values]), new_def.astype(IntegerType()) + 1)
elif type(self.datatype) is StringType and self.expr is None:
new_def = self._applyPrefixSuffixExpressions(self.prefix, self.suffix, new_def)

Expand Down
24 changes: 21 additions & 3 deletions dbldatagen/data_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from .datagen_constants import DEFAULT_RANDOM_SEED, RANDOM_SEED_FIXED, RANDOM_SEED_HASH_FIELD_NAME, \
DEFAULT_SEED_COLUMN, SPARK_RANGE_COLUMN, MIN_SPARK_VERSION, \
OPTION_RANDOM, OPTION_RANDOM_SEED, OPTION_RANDOM_SEED_METHOD, \
INFER_DATATYPE
INFER_DATATYPE, SPARK_DEFAULT_PARALLELISM
from .html_utils import HtmlUtils
from .schema_parser import SchemaParser
from .spark_singleton import SparkSingleton
Expand Down Expand Up @@ -50,6 +50,9 @@ class DataGenerator:
it is recommended that you use a different name for the seed column - for example `_id`.

This may be specified by setting the `seedColumnName` attribute to `_id`

Note: in a shared spark session, the sparkContext is not available, so the default parallelism is set to 200.
We recommend passing an explicit value for `partitions` in this case.
"""

# class vars
Expand Down Expand Up @@ -97,9 +100,8 @@ def __init__(self, sparkSession=None, name=None, randomSeedMethod=None,
# if the active Spark session is stopped, you may end up with a valid SparkSession object but the underlying
# SparkContext will be invalid
assert sparkSession is not None, "Spark session not initialized"
assert sparkSession.sparkContext is not None, "Expecting spark session to have valid sparkContext"

self.partitions = partitions if partitions is not None else sparkSession.sparkContext.defaultParallelism
self.partitions = partitions if partitions is not None else self._getDefaultSparkParallelism(sparkSession)

# check for old versions of args
if "starting_id" in kwargs:
Expand Down Expand Up @@ -239,6 +241,22 @@ def _setupLogger(self):
else:
self.logger.setLevel(logging.WARNING)

@staticmethod
def _getDefaultSparkParallelism(sparkSession):
"""Get the default parallelism for a spark session, if spark session supports getting the sparkContext
:param sparkSession: spark session
:return: default parallelism
"""
try:
if sparkSession.sparkContext is not None:
return sparkSession.sparkContext.defaultParallelism
else:
return SPARK_DEFAULT_PARALLELISM
except Exception as err: # pylint: disable=broad-exception-caught
err_msg = f"Error getting default parallelism, using default setting of {SPARK_DEFAULT_PARALLELISM}"
logging.warning(err_msg)
return SPARK_DEFAULT_PARALLELISM

@classmethod
def useSeed(cls, seedVal):
""" set seed for random number generation
Expand Down
5 changes: 4 additions & 1 deletion dbldatagen/datagen_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,7 @@
OPTION_RANDOM_SEED_METHOD = "randomSeedMethod"
OPTION_RANDOM_SEED = "randomSeed"

INFER_DATATYPE = "__infer__"
INFER_DATATYPE = "__infer__"

# default parallelism when sparkContext is not available
SPARK_DEFAULT_PARALLELISM = 200
82 changes: 82 additions & 0 deletions tests/test_shared_env.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import logging
from unittest.mock import Mock, PropertyMock

import pytest
import dbldatagen as dg


@pytest.fixture(scope="class")
def setupLogging():
FORMAT = '%(asctime)-15s %(message)s'
logging.basicConfig(format=FORMAT)


class TestSharedEnv:
"""Tests to simulate testing under a Unity Catalog shared environment. In a Unity Catalog shared environment with
the 14.x versions of the Databricks runtime, the sparkSession object does not support use of the sparkContext
attribute to get the default parallelism. In this case, we want to catch errors and return a default of
200 as the default number of partitions. This is the same as the default parallelism in many versions of Spark.


"""
SMALL_ROW_COUNT = 100000
COLUMN_COUNT = 10

@pytest.fixture(scope="class")
def sparkSession(self, setupLogging):
spark = dg.SparkSingleton.getLocalInstance("unit tests")
return spark

@pytest.fixture(scope="class")
def sharedSparkSession(self, setupLogging):
spark = Mock(wraps=dg.SparkSingleton.getLocalInstance("unit tests"))
del spark.sparkContext
return spark

@pytest.fixture(scope="class")
def sparkSessionNullContext(self, setupLogging):

class MockSparkSession:
def __init__(self):
self.sparkContext = None

spark = MockSparkSession()
return spark

def test_getDefaultParallelism(self, sparkSession):
"""Test that the default parallelism is returned when the sparkSession object supports use of the
sparkContext attribute to get the default parallelism.

:param sparkSession: The sparkSession object to use for the test.
"""
defaultParallelism = dg.DataGenerator._getDefaultSparkParallelism(sparkSession)
assert defaultParallelism == sparkSession.sparkContext.defaultParallelism

def test_getSharedDefaultParallelism(self, sharedSparkSession):
"""Test that the default parallelism is returned when the sparkSession object supports use of the
sparkContext attribute to get the default parallelism, but that a constant is return when the `sparkContext`
attribute is not available.
"""
defaultParallelism = dg.DataGenerator._getDefaultSparkParallelism(sharedSparkSession)
assert defaultParallelism == dg.SPARK_DEFAULT_PARALLELISM

def test_getNullContextDefaultParallelism(self, sparkSessionNullContext):
"""Test that the default parallelism is returned when the sparkSession object supports use of the
sparkContext attribute to get the default parallelism.

:param sparkSession: The sparkSession object to use for the test.
"""
defaultParallelism = dg.DataGenerator._getDefaultSparkParallelism(sparkSessionNullContext)
assert defaultParallelism == dg.SPARK_DEFAULT_PARALLELISM

def test_mocked_shared_session1(self, sharedSparkSession):
# validate that accessing the sparkContext on the shared spark session raises an exception
with pytest.raises(Exception) as excinfo:
context = sharedSparkSession.sparkContext

assert "sparkContext" in str(excinfo.value)

def test_null_context_spark_session(self, sparkSessionNullContext):
# validate that accessing the sparkContext on the shared spark session raises an exception
context = sparkSessionNullContext.sparkContext
assert context is None
Loading