PyIceberg 0.2.1: PyArrow and DuckDB

git clone https://github.com/tabular-io/docker-spark-iceberg.git
cd docker-spark-iceberg
docker-compose up
%%sql
CREATE DATABASE IF NOT EXISTS nyc;

CREATE TABLE IF NOT EXISTS nyc.taxis (
VendorID bigint,
tpep_pickup_datetime timestamp,
tpep_dropoff_datetime timestamp,
passenger_count double,
trip_distance double,
RatecodeID double,
store_and_fwd_flag string,
PULocationID bigint,
DOLocationID bigint,
payment_type bigint,
fare_amount double,
extra double,
mta_tax double,
tip_amount double,
tolls_amount double,
improvement_surcharge double,
total_amount double,
congestion_surcharge double,
airport_fee double
)
USING iceberg
PARTITIONED BY (days(tpep_pickup_datetime))
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Jupyter").getOrCreate()

for filename in [
"yellow_tripdata_2022-04.parquet",
"yellow_tripdata_2022-03.parquet",
"yellow_tripdata_2022-02.parquet",
"yellow_tripdata_2022-01.parquet",
"yellow_tripdata_2021-12.parquet",
]:
df = spark.read.parquet(f"/home/iceberg/data/{filename}")
df.write.mode("append").saveAsTable("nyc.taxis")

Load data into PyArrow

from pyiceberg.catalog import load_catalog

catalog = load_catalog('default', **{
'uri': 'http://rest:8181',
's3.endpoint': 'http://minio:9000',
's3.access-key-id': 'admin',
's3.secret-access-key': 'password',
})
tbl = catalog.load_table('nyc.taxis')
from pyiceberg.expressions import GreaterThanOrEqual

sc = tbl.scan(row_filter=GreaterThanOrEqual("tpep_pickup_datetime", "2022-01-01T00:00:00.000000+00:00"))
df = sc.to_arrow().to_pandas()
RangeIndex: 12671129 entries, 0 to 12671128
Data columns (total 19 columns):
# Column Dtype
--- ------ -----
0 VendorID int64
1 tpep_pickup_datetime datetime64[ns, UTC]
2 tpep_dropoff_datetime datetime64[ns, UTC]
3 passenger_count float64
4 trip_distance float64
5 RatecodeID float64
6 store_and_fwd_flag object
7 PULocationID int64
8 DOLocationID int64
9 payment_type int64
10 fare_amount float64
11 extra float64
12 mta_tax float64
13 tip_amount float64
14 tolls_amount float64
15 improvement_surcharge float64
16 total_amount float64
17 congestion_surcharge float64
18 airport_fee float64
dtypes: datetime64[ns, UTC](2), float64(12), int64(4), object(1)
memory usage: 1.8+ GB
import numpy as np
from scipy import stats

stats.zscore(df['fare_amount'])

# Remove everything larger than 3 stddev
df = df[(np.abs(stats.zscore(df['fare_amount'])) < 3)]
# Remove everything below zero
df = df[df['fare_amount'] > 0]

Query using DuckDB

%%sql --save tip-amount --no-execute

SELECT tip_amount
FROM df
%sqlplot histogram --table df --column tip_amount --bins 22 --with tip-amount
%%sql --save tip-amount-filtered --no-execute

WITH tip_amount_stddev AS (
SELECT STDDEV_POP(tip_amount) AS tip_amount_stddev
FROM df
)

SELECT tip_amount
FROM df, tip_amount_stddev
WHERE tip_amount > 0
AND tip_amount < tip_amount_stddev * 3
%sqlplot histogram --table tip-amount-filtered --column tip_amount --bins 50 --with tip-amount-filtered

--

--

Tabular is building an independent cloud-native data platform powered by the open source standard for huge analytic datasets, Apache Iceberg.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Tabular

Tabular is building an independent cloud-native data platform powered by the open source standard for huge analytic datasets, Apache Iceberg.