Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Data Fusion to load data from GCS to SFTP or FTP

Hi Team, 

Im looking for solution where i need to load data from GCS bucket to SFTP of FTP server using data fusion. i see that FTP (Depricated) in source of data fusion and i cant see FTP or SFTP in sync tab. so by using data fusion is it prossible to load data from GCS to SFTP or FTP server. If possble, Please provide steps to achieve this task. any help wolud be appreciated.

Thanks,
Purna

Solved Solved
1 1 1,048
1 ACCEPTED SOLUTION

Cloud Data Fusion doesn't directly support SFTP or FTP as a sink, you can definitely achieve this data transfer using a combination of Data Fusion and other Google Cloud components. 

To begin with, the initial step involves reading data from a GCS bucket. In the Data Fusion user interface, a new pipeline must be created, and the "Google Cloud Storage" source plugin should be dragged onto the canvas. This plugin is then configured to read data from the specified GCS bucket by providing the necessary bucket name and file pattern. This step establishes the foundation for accessing the data that needs to be transferred.

Next, if there is a requirement to process or transform the data before transferring it to the SFTP/FTP server, a transformation plugin, such as "JavaScript" or "Python," should be added to the pipeline. For instance, the "JavaScript Transform" plugin can be employed to manipulate data fields, allowing for customized data processing as needed. This step ensures that any necessary data transformations are handled efficiently within the pipeline.

The critical component of the solution involves creating a custom action to transfer the processed data to the SFTP/FTP server. This can be accomplished by adding a "PythonEvaluator" plugin to the pipeline. This plugin facilitates the execution of custom Python scripts. To proceed, it is essential to install necessary libraries, such as paramiko for SFTP, on the Cloud Data Fusion instance. The paramiko library is a robust tool for handling SFTP connections and file transfers.

The Python script below provides a basic example of how to use paramiko to transfer files from a local directory to an SFTP server:

import paramiko
import os

def transfer_to_sftp(host, port, username, password, local_path, remote_path):
    transport = paramiko.Transport((host, port))
    transport.connect(username=username, password=password)
    sftp = paramiko.SFTPClient.from_transport(transport)
    
    # List files in the local directory
    for file in os.listdir(local_path):
        if os.path.isfile(os.path.join(local_path, file)):
            sftp.put(os.path.join(local_path, file), os.path.join(remote_path, file))
    
    sftp.close()
    transport.close()

# Parameters (replace with your actual credentials and paths)
host = 'your_sftp_server'
port = 22
username = 'your_username'
password = 'your_password'
local_path = '/path/to/local/directory' 
remote_path = '/path/to/remote/directory'

# Call the transfer function
transfer_to_sftp(host, port, username, password, local_path, remote_path)

After the custom script is prepared and integrated, the pipeline is ready for deployment. The deployment process involves saving and deploying the pipeline within the Data Fusion environment. It is crucial to ensure that the Data Fusion instance has the necessary network access to both the GCS bucket and the SFTP/FTP server. Once deployed, the pipeline can be executed, and its progress monitored to verify the successful transfer of data.

There are several additional considerations to address for a successful implementation. Firstly, the Data Fusion instance must possess the required permissions to read from the GCS bucket and write to the SFTP/FTP server. This can be managed through IAM roles and policies in the Google Cloud Platform. Secondly, robust error handling should be incorporated into the Python script to manage any issues during the data transfer process. Utilizing try-except blocks to catch exceptions and log errors ensures smooth operation. Lastly, secure handling of credentials is paramount. Storing and retrieving sensitive information, such as SFTP/FTP usernames and passwords, using Cloud Secret Manager is highly recommended to enhance security.

An alternative method involves using Cloud Functions. A Cloud Function can be created to trigger upon the addition of new files to the GCS bucket, handling the transfer to the SFTP/FTP server. The following example illustrates a Cloud Function that achieves this using the paramiko library:

import paramiko
import os
from google.cloud import storage

def transfer_to_sftp(event, context):
    # Get the file details from the event
    bucket_name = event['bucket']
    file_name = event['name']
    
    # Initialize the GCS client
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(file_name)
    
    # Download the file to a temporary location
    local_path = f'/tmp/{file_name}'
    blob.download_to_filename(local_path)
    
    # SFTP server details
    host = 'your_sftp_server'
    port = 22
    username = 'your_username'
    password = 'your_password'
    remote_path = '/path/to/remote/directory'
    
    # Transfer the file to the SFTP server
    transport = paramiko.Transport((host, port))
    transport.connect(username=username, password=password)
    sftp = paramiko.SFTPClient.from_transport(transport)
    sftp.put(local_path, os.path.join(remote_path, file_name))
    sftp.close()
    transport.close()
    
    # Clean up the temporary file
    os.remove(local_path)

 

View solution in original post

1 REPLY 1

Cloud Data Fusion doesn't directly support SFTP or FTP as a sink, you can definitely achieve this data transfer using a combination of Data Fusion and other Google Cloud components. 

To begin with, the initial step involves reading data from a GCS bucket. In the Data Fusion user interface, a new pipeline must be created, and the "Google Cloud Storage" source plugin should be dragged onto the canvas. This plugin is then configured to read data from the specified GCS bucket by providing the necessary bucket name and file pattern. This step establishes the foundation for accessing the data that needs to be transferred.

Next, if there is a requirement to process or transform the data before transferring it to the SFTP/FTP server, a transformation plugin, such as "JavaScript" or "Python," should be added to the pipeline. For instance, the "JavaScript Transform" plugin can be employed to manipulate data fields, allowing for customized data processing as needed. This step ensures that any necessary data transformations are handled efficiently within the pipeline.

The critical component of the solution involves creating a custom action to transfer the processed data to the SFTP/FTP server. This can be accomplished by adding a "PythonEvaluator" plugin to the pipeline. This plugin facilitates the execution of custom Python scripts. To proceed, it is essential to install necessary libraries, such as paramiko for SFTP, on the Cloud Data Fusion instance. The paramiko library is a robust tool for handling SFTP connections and file transfers.

The Python script below provides a basic example of how to use paramiko to transfer files from a local directory to an SFTP server:

import paramiko
import os

def transfer_to_sftp(host, port, username, password, local_path, remote_path):
    transport = paramiko.Transport((host, port))
    transport.connect(username=username, password=password)
    sftp = paramiko.SFTPClient.from_transport(transport)
    
    # List files in the local directory
    for file in os.listdir(local_path):
        if os.path.isfile(os.path.join(local_path, file)):
            sftp.put(os.path.join(local_path, file), os.path.join(remote_path, file))
    
    sftp.close()
    transport.close()

# Parameters (replace with your actual credentials and paths)
host = 'your_sftp_server'
port = 22
username = 'your_username'
password = 'your_password'
local_path = '/path/to/local/directory' 
remote_path = '/path/to/remote/directory'

# Call the transfer function
transfer_to_sftp(host, port, username, password, local_path, remote_path)

After the custom script is prepared and integrated, the pipeline is ready for deployment. The deployment process involves saving and deploying the pipeline within the Data Fusion environment. It is crucial to ensure that the Data Fusion instance has the necessary network access to both the GCS bucket and the SFTP/FTP server. Once deployed, the pipeline can be executed, and its progress monitored to verify the successful transfer of data.

There are several additional considerations to address for a successful implementation. Firstly, the Data Fusion instance must possess the required permissions to read from the GCS bucket and write to the SFTP/FTP server. This can be managed through IAM roles and policies in the Google Cloud Platform. Secondly, robust error handling should be incorporated into the Python script to manage any issues during the data transfer process. Utilizing try-except blocks to catch exceptions and log errors ensures smooth operation. Lastly, secure handling of credentials is paramount. Storing and retrieving sensitive information, such as SFTP/FTP usernames and passwords, using Cloud Secret Manager is highly recommended to enhance security.

An alternative method involves using Cloud Functions. A Cloud Function can be created to trigger upon the addition of new files to the GCS bucket, handling the transfer to the SFTP/FTP server. The following example illustrates a Cloud Function that achieves this using the paramiko library:

import paramiko
import os
from google.cloud import storage

def transfer_to_sftp(event, context):
    # Get the file details from the event
    bucket_name = event['bucket']
    file_name = event['name']
    
    # Initialize the GCS client
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(file_name)
    
    # Download the file to a temporary location
    local_path = f'/tmp/{file_name}'
    blob.download_to_filename(local_path)
    
    # SFTP server details
    host = 'your_sftp_server'
    port = 22
    username = 'your_username'
    password = 'your_password'
    remote_path = '/path/to/remote/directory'
    
    # Transfer the file to the SFTP server
    transport = paramiko.Transport((host, port))
    transport.connect(username=username, password=password)
    sftp = paramiko.SFTPClient.from_transport(transport)
    sftp.put(local_path, os.path.join(remote_path, file_name))
    sftp.close()
    transport.close()
    
    # Clean up the temporary file
    os.remove(local_path)