Hello wonderful people!
I want to build a system that loads data into BigQuery every day. Dataform was first that came to my mind since it's already integrated in the GCP ecosystem and it have cool features like releases & scheduling, version control, etc.
Now, take a look at my code:
config {
type: "operations",
hasOutput: true,
tags: ["raw layer"]
}
LOAD DATA OVERWRITE ${self()}
(
category_id String,
category_name String
)
FROM FILES (
format = 'CSV',
field_delimiter = ',',
skip_leading_rows = 1, -- Skip header row (if present)
allow_quoted_newlines = TRUE, -- Allow newlines within quoted fields
uris = ['gs://${constants.CLIENT_BUCKET_NAME}/categories*.csv']
)
After loading the aforementioned CSV file, I want it to be moved to another folder in the GCS bucket. I have more .sqlx files that are part of the raw layer (bronze layer or whatever it's called these days : )
Any ideas on how to do this using Dataform?
--
Best regards
David Regalado
Web | Linkedin | Cloudskillsboost
Hi @davidregalado25 ,
To move your CSV files to another folder in your GCS bucket after loading the data, you can extend your current setup by incorporating a JavaScript action within your Dataform project. Here's a detailed approach:
Here’s how you can adjust your SQLX file:
config {
type: "operations",
hasOutput: true,
tags: ["raw layer"]
}
-- Load Data into BigQuery
LOAD DATA OVERWRITE ${self()}
(
category_id String,
category_name String
)
FROM FILES (
format = 'CSV',
field_delimiter = ',',
skip_leading_rows = 1, -- Skip header row (if present)
allow_quoted_newlines = TRUE, -- Allow newlines within quoted fields
uris = ['gs://${constants.CLIENT_BUCKET_NAME}/categories*.csv']
);
-- Capture the file paths
SELECT DISTINCT _FILE_NAME AS file_path
FROM ${self()};
Create a separate JavaScript action file (e.g., move_files.js) within your Dataform project. This action will:
const {Storage} = require('@google-cloud/storage');
async function main(params) {
const storage = new Storage();
const sourceBucket = params.sourceBucket; // From Dataform parameters
const targetFolder = params.targetFolder; // From Dataform parameters
const rows = params.rows; // Output from your SQLX query
for (const row of rows) {
const filePath = row.file_path;
const fileName = filePath.split('/').pop();
// Move the file to the target folder
await storage.bucket(sourceBucket).file(filePath).move(`${targetFolder}/${fileName}`);
}
}
module.exports = {main};
Integration in Dataform:
{
"actions": [
{
"name": "move_csv_files",
"type": "js",
"file": "move_files.js",
"dependencies": ["your_sqlx_file_name"], // Ensure SQLX runs first
"params": {
"sourceBucket": "${constants.CLIENT_BUCKET_NAME}",
"targetFolder": "processed"
}
}
]
}
Some Considerations:
@google-cloud/storage
).Hi @ms4446,
I'm trying to follow the solution you have given here in Dataform in GCP, but when running this line
const {Storage} = require('@google-cloud/storage');
Failed to resolve child_process
Hi @domokapsky ,
Yes, this is a limitation due to Dataform's restricted execution environment, which does not support Node.js modules that rely on certain system functionalities like child_process
.
However, you can work around this limitation by using Cloud Functions or Cloud Run to handle the file-moving operation. Here’s how you can integrate this into your Dataform workflow:
Approach Using Google Cloud Functions
1. Create a Cloud Function:
Develop a Cloud Function that moves files in GCS.
// index.js
const {Storage} = require('@google-cloud/storage');
exports.moveFiles = async (req, res) => {
const storage = new Storage();
const sourceBucketName = req.body.sourceBucket;
const targetFolder = req.body.targetFolder;
const filePaths = req.body.filePaths; // Array of file paths to move
try {
for (const filePath of filePaths) {
const fileName = filePath.split('/').pop();
await storage.bucket(sourceBucketName).file(filePath).move(`${targetFolder}/${fileName}`);
}
res.status(200).send('Files moved successfully.');
} catch (error) {
console.error('Error moving files:', error);
res.status(500).send('Error moving files.');
}
};
2. Deploy the Cloud Function:
Deploy the function using the gcloud CLI:
gcloud functions deploy moveFiles \
--runtime nodejs18 \
--trigger-http \
--allow-unauthenticated
3. Call the Cloud Function from Dataform:
Modify your Dataform JavaScript action to call the Cloud Function via HTTP request instead of using the GCS Node.js client library directly.
const axios = require('axios');
async function main(params) {
const url = 'https://<YOUR_CLOUD_FUNCTION_URL>'; // Replace with your Cloud Function URL
const payload = {
sourceBucket: params.sourceBucket,
targetFolder: params.targetFolder,
filePaths: params.rows.map(row => row.file_path)
};
try {
const response = await axios.post(url, payload);
console.log('Cloud Function response:', response.data);
} catch (error) {
console.error('Error calling Cloud Function:', error);
throw error;
}
}
module.exports = {main};
4. Update Dataform Configuration:
Ensure your dataform.json
or configuration file reflects these changes.
{
"actions": [
{
"name": "move_csv_files",
"type": "js",
"file": "move_files.js",
"dependencies": ["your_sqlx_file_name"], // Ensure SQLX runs first
"params": {
"sourceBucket": "${constants.CLIENT_BUCKET_NAME}",
"targetFolder": "processed"
}
}
]
}
If you prefer using Cloud Run for more control over the environment:
1. Create a Cloud Run Service:
@google-cloud/storage
to interact with GCS.2. Call Cloud Run from Dataform:
axios
call method in the Dataform JavaScript action.By using Cloud Functions or Cloud Run, you can handle more complex file operations while leveraging Dataform for the core data management and orchestration. This method bypasses the limitations of Dataform’s execution environment and provides a flexible solution for your use case.
Thanks for that insightful and detailed answer @ms4446! Shame about the limitations, but this is so helpful for understanding the capabilities and limitations of Dataform in GCP as well as how to implement workarounds.
1. I wasn't using external tables. Now I'm using external tables since is the only way I can access the _FILE_NAME peudocolumn.
2. I got the same "Failed to resolve child_process" error. I will move the logic to a cloud function as suggested.
Thanks for the replies, mate!
--
Best regards
David Regalado
Web | Linkedin | Cloudskillsboost
Hello @ms4446, I tried to replicate the invocation of the Cloud Function from Dataform without success, my move_csv_file.js file is not executed after my .sqlx file,
I also tried to invoke the js function from a .sqlx file as follows:
Obtaining this result:
[object Promise]
My Cloud Functions not running
I found this other documentation that could help me with the case,
Work with remote functions in Bigquery
ref: https://cloud.google.com/bigquery/docs/remote-functions?hl=es-419#console
I understand that if it can be run in Bigquery, it should also be able to be run through Dataform, I will try to test it, I would also like to know if there will be the possibility of adding this functionality to the Dataform roadmap?