Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

DATAFORM - Move csv files after loading the data

Hello wonderful people!

I want to build a system that loads data into BigQuery every day. Dataform was first that came to my mind since it's already integrated in the GCP ecosystem and it have cool features like releases & scheduling, version control, etc. 

Now, take a look at my code:

 

 

config { 
  type: "operations",
  hasOutput: true,
  tags: ["raw layer"]
}

LOAD DATA OVERWRITE ${self()}
( 
  category_id String,
  category_name String 
)
FROM FILES ( 
  format = 'CSV',
  field_delimiter = ',',
  skip_leading_rows = 1,  -- Skip header row (if present)
  allow_quoted_newlines = TRUE,  -- Allow newlines within quoted fields
  uris = ['gs://${constants.CLIENT_BUCKET_NAME}/categories*.csv']
)

 

 

After loading the aforementioned CSV file, I want it to be moved to another folder in the GCS bucket. I have more .sqlx files that are part of the raw layer (bronze layer or whatever it's called these days   : )

Any ideas on how to do this using Dataform?

--
Best regards
David Regalado
Web | Linkedin | Cloudskillsboost

0 6 2,047
6 REPLIES 6

Hi @davidregalado25 ,

To move your CSV files to another folder in your GCS bucket after loading the data, you can extend your current setup by incorporating a JavaScript action within your Dataform project. Here's a detailed approach:

  • Your existing configuration is correct, as the hasOutput flag indicates that your SQLX file produces a result, which will be leverage later.
  • Modify your SQLX query to return the full paths of the CSV files that were successfully loaded.

Here’s how you can adjust your SQLX file:

 
config { 
  type: "operations",
  hasOutput: true,
  tags: ["raw layer"]
}

-- Load Data into BigQuery
LOAD DATA OVERWRITE ${self()}
( 
  category_id String,
  category_name String 
)
FROM FILES ( 
  format = 'CSV',
  field_delimiter = ',',
  skip_leading_rows = 1,  -- Skip header row (if present)
  allow_quoted_newlines = TRUE,  -- Allow newlines within quoted fields
  uris = ['gs://${constants.CLIENT_BUCKET_NAME}/categories*.csv']
);

-- Capture the file paths
SELECT DISTINCT _FILE_NAME AS file_path
FROM ${self()};

Create a separate JavaScript action file (e.g., move_files.js) within your Dataform project. This action will:

  1. Take the list of file paths from the previous SQLX output.
  2. Use the GCS Node.js client library to move each file to your desired destination folder.
 
const {Storage} = require('@google-cloud/storage');

async function main(params) {
  const storage = new Storage();
  const sourceBucket = params.sourceBucket; // From Dataform parameters
  const targetFolder = params.targetFolder; // From Dataform parameters

  const rows = params.rows; // Output from your SQLX query
  for (const row of rows) {
    const filePath = row.file_path;
    const fileName = filePath.split('/').pop();

    // Move the file to the target folder
    await storage.bucket(sourceBucket).file(filePath).move(`${targetFolder}/${fileName}`);
  }
}

module.exports = {main};

Integration in Dataform:

  • In your Dataform project's dataform.json or a separate configuration file, specify the JavaScript action file and parameters for the source bucket and target folder.
  • Add both your SQLX file and the JavaScript action to your Dataform pipeline, ensuring the action runs after the SQLX file.
 
{
  "actions": [
    {
      "name": "move_csv_files",
      "type": "js",
      "file": "move_files.js",
      "dependencies": ["your_sqlx_file_name"], // Ensure SQLX runs first
      "params": {
        "sourceBucket": "${constants.CLIENT_BUCKET_NAME}",
        "targetFolder": "processed" 
      }
    }
  ]
}

Some Considerations:

  1.  Ensure you have the necessary dependencies installed in your Dataform environment to use the GCS client library (@google-cloud/storage).
  2. Implement robust error handling in your JavaScript action to catch any issues during file movement.
  3. Use Dataform's scheduling features to automate the daily execution of this pipeline.
  4. Ensure the action is idempotent by checking if the file already exists in the target folder before moving it.

Hi @ms4446,

I'm trying to follow the solution you have given here in Dataform in GCP, but when running this line 

const {Storage} = require('@google-cloud/storage');
I get an error stating
Failed to resolve child_process
 I also found this answer you gave on a different post https://www.googlecloudcommunity.com/gc/Data-Analytics/Backfilling-tables-with-via-Dataform/m-p/6144... that involved importing `child_process` but this obviously also didn't work in my environment.
 
Is this a limitation of Dataform in GCP or is there some way to resolve this?
Thanks!

Hi @domokapsky ,

Yes,  this is a limitation due to Dataform's restricted execution environment, which does not support Node.js modules that rely on certain system functionalities like child_process.

However, you can work around this limitation by using Cloud Functions or Cloud Run to handle the file-moving operation. Here’s how you can integrate this into your Dataform workflow:

Approach Using Google Cloud Functions

1. Create a Cloud Function:

Develop a Cloud Function that moves files in GCS.

// index.js

const {Storage} = require('@google-cloud/storage');

exports.moveFiles = async (req, res) => {
  const storage = new Storage();
  const sourceBucketName = req.body.sourceBucket;
  const targetFolder = req.body.targetFolder;
  const filePaths = req.body.filePaths; // Array of file paths to move

  try {
    for (const filePath of filePaths) {
      const fileName = filePath.split('/').pop();
      await storage.bucket(sourceBucketName).file(filePath).move(`${targetFolder}/${fileName}`);
    }
    res.status(200).send('Files moved successfully.');
  } catch (error) {
    console.error('Error moving files:', error);
    res.status(500).send('Error moving files.');
  }
};

2. Deploy the Cloud Function:

Deploy the function using the gcloud CLI:

gcloud functions deploy moveFiles \ --runtime nodejs18 \ --trigger-http \ --allow-unauthenticated

3. Call the Cloud Function from Dataform:

Modify your Dataform JavaScript action to call the Cloud Function via HTTP request instead of using the GCS Node.js client library directly.

 

const axios = require('axios');

async function main(params) {
  const url = 'https://<YOUR_CLOUD_FUNCTION_URL>'; // Replace with your Cloud Function URL

  const payload = {
    sourceBucket: params.sourceBucket,
    targetFolder: params.targetFolder,
    filePaths: params.rows.map(row => row.file_path)
  };

  try {
    const response = await axios.post(url, payload);
    console.log('Cloud Function response:', response.data);
  } catch (error) {
    console.error('Error calling Cloud Function:', error);
    throw error;
  }
}

module.exports = {main};

4. Update Dataform Configuration:

Ensure your dataform.json or configuration file reflects these changes.

 

{
  "actions": [
    {
      "name": "move_csv_files",
      "type": "js",
      "file": "move_files.js",
      "dependencies": ["your_sqlx_file_name"], // Ensure SQLX runs first
      "params": {
        "sourceBucket": "${constants.CLIENT_BUCKET_NAME}",
        "targetFolder": "processed"
      }
    }
  ]
}

If you prefer using Cloud Run for more control over the environment:

1. Create a Cloud Run Service:

  • Write the file-moving logic in a Node.js application.
  • Use @google-cloud/storage to interact with GCS.
  • Deploy this application to Cloud Run.

2. Call Cloud Run from Dataform:

  • Replace the Cloud Function URL with your Cloud Run service URL.
  • Use the same axios call method in the Dataform JavaScript action.

By using Cloud Functions or Cloud Run, you can handle more complex file operations while leveraging Dataform for the core data management and orchestration. This method bypasses the limitations of Dataform’s execution environment and provides a flexible solution for your use case.

Thanks for that insightful and detailed answer @ms4446! Shame about the limitations, but this is so helpful for understanding the capabilities and limitations of Dataform in GCP as well as how to implement workarounds.

1. I wasn't using external tables. Now I'm using external tables since is the only way I can access the _FILE_NAME peudocolumn.

2. I got the same "Failed to resolve child_process" error. I will move the logic to a cloud function as suggested.

Thanks for the replies, mate!

--
Best regards
David Regalado
Web | Linkedin | Cloudskillsboost 

Hello @ms4446, I tried to replicate the invocation of the Cloud Function from Dataform without success, my move_csv_file.js file is not executed after my .sqlx file, 

joelvilca_2-1720554426702.png

joelvilca_1-1720554358933.png

I also tried to invoke the js function from a .sqlx file as follows: 

joelvilca_0-1720553181025.png

Obtaining this result:

[object Promise]

My Cloud Functions not running

I found this other documentation that could help me with the case,
Work with remote functions in Bigquery
ref: https://cloud.google.com/bigquery/docs/remote-functions?hl=es-419#console

I understand that if it can be run in Bigquery, it should also be able to be run through Dataform, I will try to test it, I would also like to know if there will be the possibility of adding this functionality to the Dataform roadmap?