Skip to content

Commit a082616

Browse files
authored
feat: sample for AppendRowsStream with Arrow (#915)
* feat: sample for AppendRowsStream with Arrow * Update requirements.txt * add test fixtures * lint * remove unused code * correct fixtures * add log for debugging * remove debug code and small fixes * remove fastavro in requirements.txt * remove version for storage * add print * use schema in template and add table verification * add a simple strategy to split large tables * fix unit test * wording * add dependency * address comments
1 parent a694aad commit a082616

File tree

9 files changed

+582
-8
lines changed

9 files changed

+582
-8
lines changed

packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/writer.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,13 @@ def _process_request_template(
7272
# The protobuf payload will be decoded as proto2 on the server side. The
7373
# schema is also specified as proto2. Hence we must clear proto3-only
7474
# features. This works since proto2 and proto3 are binary-compatible.
75-
proto_descriptor = template_copy.proto_rows.writer_schema.proto_descriptor
76-
for field in proto_descriptor.field:
77-
field.ClearField("oneof_index")
78-
field.ClearField("proto3_optional")
79-
proto_descriptor.ClearField("oneof_decl")
75+
oneof_field = template_copy._pb.WhichOneof("rows")
76+
if oneof_field == "proto_rows":
77+
proto_descriptor = template_copy.proto_rows.writer_schema.proto_descriptor
78+
for field in proto_descriptor.field:
79+
field.ClearField("oneof_index")
80+
field.ClearField("proto3_optional")
81+
proto_descriptor.ClearField("oneof_decl")
8082

8183
return template_copy
8284

packages/google-cloud-bigquery-storage/samples/conftest.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import datetime
1516
import os
1617

1718
import pytest
@@ -20,3 +21,20 @@
2021
@pytest.fixture(scope="session")
2122
def project_id():
2223
return os.environ["GOOGLE_CLOUD_PROJECT"]
24+
25+
26+
@pytest.fixture(scope="session")
27+
def dataset(project_id):
28+
from google.cloud import bigquery
29+
30+
client = bigquery.Client()
31+
dataset_suffix = datetime.datetime.now().strftime("%y%m%d_%H%M%S")
32+
dataset_name = "samples_tests_" + dataset_suffix
33+
34+
dataset_id = "{}.{}".format(project_id, dataset_name)
35+
dataset = bigquery.Dataset(dataset_id)
36+
dataset.location = "us-east7"
37+
created_dataset = client.create_dataset(dataset)
38+
yield created_dataset
39+
40+
client.delete_dataset(created_dataset, delete_contents=True)
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# Copyright 2020 Google LLC
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# https://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# Copyright 2020 Google LLC
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# https://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
import datetime
17+
import decimal
18+
import pandas as pd
19+
import pyarrow as pa
20+
21+
from google.cloud import bigquery
22+
from google.cloud.bigquery import enums
23+
from google.cloud.bigquery_storage_v1 import types as gapic_types
24+
from google.cloud.bigquery_storage_v1.writer import AppendRowsStream
25+
26+
27+
TABLE_LENGTH = 100_000
28+
29+
BQ_SCHEMA = [
30+
bigquery.SchemaField("bool_col", enums.SqlTypeNames.BOOLEAN),
31+
bigquery.SchemaField("int64_col", enums.SqlTypeNames.INT64),
32+
bigquery.SchemaField("float64_col", enums.SqlTypeNames.FLOAT64),
33+
bigquery.SchemaField("numeric_col", enums.SqlTypeNames.NUMERIC),
34+
bigquery.SchemaField("bignumeric_col", enums.SqlTypeNames.BIGNUMERIC),
35+
bigquery.SchemaField("string_col", enums.SqlTypeNames.STRING),
36+
bigquery.SchemaField("bytes_col", enums.SqlTypeNames.BYTES),
37+
bigquery.SchemaField("date_col", enums.SqlTypeNames.DATE),
38+
bigquery.SchemaField("datetime_col", enums.SqlTypeNames.DATETIME),
39+
bigquery.SchemaField("time_col", enums.SqlTypeNames.TIME),
40+
bigquery.SchemaField("timestamp_col", enums.SqlTypeNames.TIMESTAMP),
41+
bigquery.SchemaField("geography_col", enums.SqlTypeNames.GEOGRAPHY),
42+
bigquery.SchemaField(
43+
"range_date_col", enums.SqlTypeNames.RANGE, range_element_type="DATE"
44+
),
45+
bigquery.SchemaField(
46+
"range_datetime_col",
47+
enums.SqlTypeNames.RANGE,
48+
range_element_type="DATETIME",
49+
),
50+
bigquery.SchemaField(
51+
"range_timestamp_col",
52+
enums.SqlTypeNames.RANGE,
53+
range_element_type="TIMESTAMP",
54+
),
55+
]
56+
57+
PYARROW_SCHEMA = pa.schema(
58+
[
59+
pa.field("bool_col", pa.bool_()),
60+
pa.field("int64_col", pa.int64()),
61+
pa.field("float64_col", pa.float64()),
62+
pa.field("numeric_col", pa.decimal128(38, scale=9)),
63+
pa.field("bignumeric_col", pa.decimal256(76, scale=38)),
64+
pa.field("string_col", pa.string()),
65+
pa.field("bytes_col", pa.binary()),
66+
pa.field("date_col", pa.date32()),
67+
pa.field("datetime_col", pa.timestamp("us")),
68+
pa.field("time_col", pa.time64("us")),
69+
pa.field("timestamp_col", pa.timestamp("us")),
70+
pa.field("geography_col", pa.string()),
71+
pa.field(
72+
"range_date_col",
73+
pa.struct([("start", pa.date32()), ("end", pa.date32())]),
74+
),
75+
pa.field(
76+
"range_datetime_col",
77+
pa.struct([("start", pa.timestamp("us")), ("end", pa.timestamp("us"))]),
78+
),
79+
pa.field(
80+
"range_timestamp_col",
81+
pa.struct([("start", pa.timestamp("us")), ("end", pa.timestamp("us"))]),
82+
),
83+
]
84+
)
85+
86+
87+
def bqstorage_write_client():
88+
from google.cloud import bigquery_storage_v1
89+
90+
return bigquery_storage_v1.BigQueryWriteClient()
91+
92+
93+
def make_table(project_id, dataset_id, bq_client):
94+
table_id = "append_rows_w_arrow_test"
95+
table_id_full = f"{project_id}.{dataset_id}.{table_id}"
96+
bq_table = bigquery.Table(table_id_full, schema=BQ_SCHEMA)
97+
created_table = bq_client.create_table(bq_table)
98+
99+
return created_table
100+
101+
102+
def create_stream(bqstorage_write_client, table):
103+
stream_name = f"projects/{table.project}/datasets/{table.dataset_id}/tables/{table.table_id}/_default"
104+
request_template = gapic_types.AppendRowsRequest()
105+
request_template.write_stream = stream_name
106+
107+
# Add schema to the template.
108+
arrow_data = gapic_types.AppendRowsRequest.ArrowData()
109+
arrow_data.writer_schema.serialized_schema = PYARROW_SCHEMA.serialize().to_pybytes()
110+
request_template.arrow_rows = arrow_data
111+
112+
append_rows_stream = AppendRowsStream(
113+
bqstorage_write_client,
114+
request_template,
115+
)
116+
return append_rows_stream
117+
118+
119+
def generate_pyarrow_table(num_rows=TABLE_LENGTH):
120+
date_1 = datetime.date(2020, 10, 1)
121+
date_2 = datetime.date(2021, 10, 1)
122+
123+
datetime_1 = datetime.datetime(2016, 12, 3, 14, 11, 27, 123456)
124+
datetime_2 = datetime.datetime(2017, 12, 3, 14, 11, 27, 123456)
125+
126+
timestamp_1 = datetime.datetime(
127+
1999, 12, 31, 23, 59, 59, 999999, tzinfo=datetime.timezone.utc
128+
)
129+
timestamp_2 = datetime.datetime(
130+
2000, 12, 31, 23, 59, 59, 999999, tzinfo=datetime.timezone.utc
131+
)
132+
133+
# Pandas Dataframe.
134+
rows = []
135+
for i in range(num_rows):
136+
row = {
137+
"bool_col": True,
138+
"int64_col": i,
139+
"float64_col": float(i),
140+
"numeric_col": decimal.Decimal("0.000000001"),
141+
"bignumeric_col": decimal.Decimal("0.1234567891"),
142+
"string_col": "data as string",
143+
"bytes_col": str.encode("data in bytes"),
144+
"date_col": datetime.date(2019, 5, 10),
145+
"datetime_col": datetime_1,
146+
"time_col": datetime.time(23, 59, 59, 999999),
147+
"timestamp_col": timestamp_1,
148+
"geography_col": "POINT(-121 41)",
149+
"range_date_col": {"start": date_1, "end": date_2},
150+
"range_datetime_col": {"start": datetime_1, "end": datetime_2},
151+
"range_timestamp_col": {"start": timestamp_1, "end": timestamp_2},
152+
}
153+
rows.append(row)
154+
df = pd.DataFrame(rows)
155+
156+
# Dataframe to PyArrow Table.
157+
table = pa.Table.from_pandas(df, schema=PYARROW_SCHEMA)
158+
159+
return table
160+
161+
162+
def generate_write_requests(pyarrow_table):
163+
# Determine max_chunksize of the record batches. Because max size of
164+
# AppendRowsRequest is 10 MB, we need to split the table if it's too big.
165+
# See: https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1#appendrowsrequest
166+
max_request_bytes = 10 * 2**20 # 10 MB
167+
chunk_num = int(pyarrow_table.nbytes / max_request_bytes) + 1
168+
chunk_size = int(pyarrow_table.num_rows / chunk_num)
169+
170+
# Construct request(s).
171+
for batch in pyarrow_table.to_batches(max_chunksize=chunk_size):
172+
request = gapic_types.AppendRowsRequest()
173+
request.arrow_rows.rows.serialized_record_batch = batch.serialize().to_pybytes()
174+
yield request
175+
176+
177+
def append_rows(bqstorage_write_client, table):
178+
append_rows_stream = create_stream(bqstorage_write_client, table)
179+
pyarrow_table = generate_pyarrow_table()
180+
futures = []
181+
182+
for request in generate_write_requests(pyarrow_table):
183+
response_future = append_rows_stream.send(request)
184+
futures.append(response_future)
185+
response_future.result()
186+
187+
return futures
188+
189+
190+
def verify_result(client, table, futures):
191+
bq_table = client.get_table(table)
192+
193+
# Verify table schema.
194+
assert bq_table.schema == BQ_SCHEMA
195+
196+
# Verify table size.
197+
query = client.query(f"SELECT COUNT(1) FROM `{bq_table}`;")
198+
query_result = query.result().to_dataframe()
199+
# There might be extra rows due to retries.
200+
assert query_result.iloc[0, 0] >= TABLE_LENGTH
201+
202+
# Verify that table was split into multiple requests.
203+
assert len(futures) == 2
204+
205+
206+
def main(project_id, dataset):
207+
write_client = bqstorage_write_client()
208+
bq_client = bigquery.Client()
209+
table = make_table(project_id, dataset.dataset_id, bq_client)
210+
211+
futures = append_rows(write_client, table)
212+
verify_result(bq_client, table, futures)
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# Copyright 2021 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from . import append_rows_with_arrow
16+
17+
18+
def test_append_rows_with_arrow(project_id, dataset):
19+
append_rows_with_arrow.main(project_id, dataset)

0 commit comments

Comments
 (0)