Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incremental using airflow ranges and a naive datetime cursor column introduces lag #2225

Open
julesmga opened this issue Jan 17, 2025 · 2 comments
Assignees
Labels
question Further information is requested

Comments

@julesmga
Copy link

julesmga commented Jan 17, 2025

dlt version

1.5.0

Describe the problem

When using airflow dates for incremental, the resulting SQL query is using UTC dates. At least in Snowflake, when comparing a timestamp without timezone to a timestamp with timezone, the zone is completely ignored. In the UTC zone, you get at most 1x your DAG frequency of lag which is normal.

In our case however (CET), we get a lag equal to our DAG frequency + whatever offset we're at depending on the time of year. Ideally we would use zone aware cursor columns but unfortunately many vendors do not use zoned timestamps at all. I guess it would be even worse for users on the other side of UTC as the UTC ranges would effectively be in the future and return nothing at all.

We tried many things, adding timezone information in all airflow dates, and in the pyarrow backend kwargs but it didn't help. We then modified the _join_external_scheduler function to convert the airflow dates to our timezone, but then the rows would get filtered out as the cursor column is considered UTC in the pyarrow backend and would fall out of the zoned range.

The only thing that worked is to keep the dates in the UTC zone but apply our current UTC offset to them. There is probably a much better way to do this, but here is a crude fix:

def add_zone_offset(utc_date: pendulum.DateTime, tz: str) -> pendulum.DateTime:
    date = utc_date.in_tz(tz)
    return utc_date + date.utcoffset()


class NaiveIncremental(Incremental):
    def _join_external_scheduler(self) -> None:
        def _ensure_airflow_end_date(
            start_date: pendulum.DateTime, end_date: pendulum.DateTime
        ) -> Optional[pendulum.DateTime]:
            now = add_zone_offset(pendulum.now(), TZ)
            if end_date is None or end_date > now or start_date == end_date:
                return now
            return end_date

        context = get_current_context()
        start_date = add_zone_offset(context["data_interval_start"], TZ)
        end_date = _ensure_airflow_end_date(start_date, add_zone_offset(context["data_interval_end"], TZ))
        self.initial_value = start_date
        if end_date is not None:
            self.end_value = end_date
        else:
            self.end_value = None
        logger.info(
            f"Found Airflow scheduler: initial value: {self.initial_value} from"
            f" data_interval_start {context['data_interval_start']}, end value:"
            f" {self.end_value} from data_interval_end {context['data_interval_end']}"
        )

Expected behavior

dlt should have a way to specify a zone for naive cursor columns.

Steps to reproduce

Use airflow ranges on a naive cursor column that is not UTC.

Operating system

Linux

Runtime environment

Google Cloud Composer

Python version

3.11

dlt data source

SQLAlchemy (Snowflake)

dlt destination

Snowflake

Other deployment details

No response

Additional information

No response

@rudolfix rudolfix self-assigned this Jan 20, 2025
@rudolfix rudolfix added the question Further information is requested label Jan 20, 2025
@rudolfix rudolfix moved this from Todo to In Progress in dlt core library Jan 20, 2025
@rudolfix
Copy link
Collaborator

@julesmga so the situation is:

  • you have naive date times in the database
  • incremental adopts the range (start/end date) from Airflow scheduler that is in UTC
  • it then generates SQL queries that compare UTC dates with naive (db) dates which obviously is incorrect
    is that correct?
  1. did you try to setup your DAG using correct timezones? (https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/timezone.html). is context["data_interval_start"] always UTC even if you use non-UTC start_date in your DAG?
  2. maybe you are better off to switch default Airflow timezone to "your" timezone, since you already have naive datetimes...
    I'm just making sure we should fix dlt or maybe just improve our docs

@julesmga
Copy link
Author

@rudolfix Correct, situation is as you described. Our DAGs already use dates with our timezone, and the Airflow default timezone is also set accordingly, however the airflow context is in UTC regardless, and the loaded arrow tables are as well (even when passing tz in the backend kwargs).

Initially I tried just converting the UTC airflow ranges to our timezone, it worked as intended on the database side but then the loaded rows would be instantly discarded by the dlt start_out_of_range and end_out_of_range logic as the naive dates are considered UTC and thus would fall out of range. The current workaround was the next best thing.

Maybe some kind of timezone compensation can be integrated in the incremental logic, converting zones would be the most intuitive but I don't think it would behave the same across all backends (depending on how they treat naive dates) so using offset like I currently do may be the most universal fix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
Status: In Progress
Development

No branches or pull requests

2 participants