Importing Referenced Files in AWS Glue with Boto3

In this entry, you will learn how to use boto3 to download referenced files, such as RSD files, from S3 to the AWS Glue executor. Doing so will allow the JDBC driver to reference and use the necessary files.

公開日: 3/26/2020    最終更新日: 4/2/2020    Author: Garrett Bird


Certain providers rely on a direct local connection to file, whereas others may depend on RSD schema files to help define the data model. In either case, the referenced files in S3 cannot be directly accessed by the driver running in AWS Glue. The job will first need to fetch these files before they can be used. This is where boto3 becomes useful. In particular, enabling a JDBC driver's access to relevant files is as simple as downloading the file from S3 using boto3 prior the actual use of the JDBC driver.

Initial Preparation

This entry will use the JSON provider as an example, as it can both rely on a local file as the data source and use an RSD file to define the table's metadata. If you have not already, perform the initial preparation steps outlined in this article. Once done, you will additionally want to upload your referenced files into S3. For the purposes of this entry, the following JSON and RSD files will be referenced:

people.json

{
  "people": [
    {
      "personal": {
        "age": 20,
        "gender": "M",
        "name": {
          "first": "John",
          "last": "Doe"
        }
      },
      "vehicles": [
        {
          "type": "car",
          "model": "Honda Civic",
          "insurance": {
            "company": "ABC Insurance",
            "policy_num": "12345"
          },
          "maintenance": [
            {
              "date": "07-17-2017",
              "desc": "oil change"
            },
            {
              "date": "01-03-2018",
              "desc": "new tires"
            }
          ]
        },
        {
          "type": "truck",
          "model": "Dodge Ram",
          "insurance": {
            "company": "ABC Insurance",
            "policy_num": "12345"
          },
          "maintenance": [
            {
              "date": "08-27-2017",
              "desc": "new tires"
            },
            {
              "date": "01-08-2018",
              "desc": "oil change"
            }
          ]
        }
      ],
      "source": "internet"
    },
    {
      "personal": {
        "age": 24,
        "gender": "F",
        "name": {
          "first": "Jane",
          "last": "Roberts"
        }
      },
      "vehicles": [
        {
          "type": "car",
          "model": "Toyota Camry",
          "insurance": {
            "company": "Car Insurance",
            "policy_num": "98765"
          },
          "maintenance": [
            {
              "date": "05-11-2017",
              "desc": "tires rotated"
            },
            {
              "date": "11-03-2017",
              "desc": "oil change"
            }
          ]
        },
        {
          "type": "car",
          "model": "Honda Accord",
          "insurance": {
            "company": "Car Insurance",
            "policy_num": "98765"
          },
          "maintenance": [
            {
              "date": "10-07-2017",
              "desc": "new air filter"
            },
            {
              "date": "01-13-2018",
              "desc": "new brakes"
            }
          ]
        }
      ],
      "source": "phone"
    }
  ]
}

people.rsd


<api:script xmlns:api="http://apiscript.com/ns?v1" xmlns:xs="http://www.w3.org/2001/XMLSchema">

  <!-- See Column Definitions to specify column behavior and use XPaths to extract column values from JSON. -->
  <api:info title="people" desc="Generated schema file." xmlns:other="http://apiscript.com/ns?v1">
    <!-- You can modify the name, type, and column size here. -->
    <attr name="date"                 xs:type="date"    readonly="false"              other:xPath="/json/people/vehicles/maintenance/date"     />
    <attr name="desc"                 xs:type="string"  readonly="false"              other:xPath="/json/people/vehicles/maintenance/desc"     />
    <attr name="insurance.company"    xs:type="string"  readonly="false"              other:xPath="/json/people/vehicles/insurance/company"    />
    <attr name="insurance.policy_num" xs:type="string"  readonly="false"              other:xPath="/json/people/vehicles/insurance/policy_num" />
    <attr name="maintenance:_id"      xs:type="string"  readonly="false"  key="true"  other:xPath="/json/people/vehicles/maintenance/_id"      />
    <attr name="model"                xs:type="string"  readonly="false"              other:xPath="/json/people/vehicles/model"                />
    <attr name="people:_id"           xs:type="string"  readonly="false"  key="true"  other:xPath="/json/people/_id"                           />
    <attr name="personal.age"         xs:type="integer" readonly="false"              other:xPath="/json/people/personal/age"                  />
    <attr name="personal.gender"      xs:type="string"  readonly="false"              other:xPath="/json/people/personal/gender"               />
    <attr name="personal.name.first"  xs:type="string"  readonly="false"              other:xPath="/json/people/personal/name/first"           />
    <attr name="personal.name.last"   xs:type="string"  readonly="false"              other:xPath="/json/people/personal/name/last"            />
    <attr name="source"               xs:type="string"  readonly="false"              other:xPath="/json/people/source"                        />
    <attr name="type"                 xs:type="string"  readonly="false"              other:xPath="/json/people/vehicles/type"                 />
    <attr name="vehicles:_id"         xs:type="string"  readonly="false"  key="true"  other:xPath="/json/people/vehicles/_id"                  />
  </api:info>

  <api:set attr="DataModel" value="FLATTENEDDOCUMENTS" />
  <api:set attr="URI" value="/tmp/people.json" />

  <api:set attr="JSONPath" value="$.people.vehicles.maintenance;$.people.vehicles;$.people" />

  <!-- The GET method corresponds to SELECT. Here you can override the default processing of the SELECT statement. The results of processing are pushed to the schema's output. See SELECT Execution for more information. -->
  <api:script method="GET">
    <api:call op="jsonproviderGet">
      <api:push/>
    </api:call>
  </api:script>

  <!-- To add support for INSERTS please see the INSERT Execution page within the help for further information and examples. -->
  <api:script method="POST">
    <api:set attr="method" value="POST"/>
    <api:call op="jsonproviderGet">
      <api:throw code="500" desc="Inserts are not currently supported."/>
      <api:push/>
    </api:call>
  </api:script>

  <!-- To add support for UPDATES please see the UPDATE Execution page within the help for further information and examples. -->
  <api:script method="MERGE">
    <api:set attr="method" value="PUT"/>
    <api:call op="jsonproviderGet">
      <api:throw code="500" desc="Updates are not currently supported."/>
      <api:push/>
    </api:call>
  </api:script>

  <!-- To add support for DELETES please see the DELETE Execution page within the help for further information and examples. -->
  <api:script method="DELETE">
    <api:set attr="method" value="DELETE"/>
    <api:call op="jsonproviderGet">
      <api:throw code="500" desc="Deletes are not currently supported."/>
      <api:push/>
    </api:call>
  </api:script>

</api:script>

Writing the Glue Script

Once the necessary resources are uploaded to S3. The script will mostly be the same as in the linked article, except for the following changes:

  1. Additional imports to include boto3, botocore, and TransferConfig.
  2. Additional code to download desired files from an S3 resource. The tmp directory is an ideal destination, as all users can write to files there.
  3. Among the configs used is "use_threads," which is set to false here. This helps to eliminate concurrency issues, as the file download must be finished before the driver tries to reference the files.
  4. Modified JDBC URL to rely on the imported RSD file rather than direct connection properties. This is done by setting Location to the tmp directory.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
import boto3
import botocore
from boto3.s3.transfer import TransferConfig

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sparkContext = SparkContext()
glueContext = GlueContext(sparkContext)
sparkSession = glueContext.spark_session

##Use boto3 to download the files referenced by the JDBC driver
s3 = boto3.resource('s3')
dlConfig = TransferConfig(use_threads=False)
try:
    s3.Bucket('mybucket').download_file('people.json', '/tmp/people.json', Config=dlConfig)
    s3.Bucket('mybucket').download_file('people.rsd', '/tmp/people.rsd', Config=dlConfig)
except Exception as e:
    if e.response['Error']['Code'] == "404":
        print("The object does not exist.")
    else:
        raise

##Use the CData JDBC driver to read JSON services from the people table into a DataFrame
##Note the populated JDBC URL and driver class name
source_df = sparkSession.read.format("jdbc").option("url", "jdbc:json:RTK=5246...;Location=/tmp;").option("dbtable", "people").option("driver", "cdata.jdbc.json.JSONDriver").load()

glueJob = Job(glueContext)
glueJob.init(args['JOB_NAME'], args)

##Convert DataFrames to AWS Glue's DynamicFrames Object
dynamic_dframe = DynamicFrame.fromDF(source_df, glueContext, "dynamic_df")

##Write the DynamicFrame as a file in CSV format to a folder in an S3 bucket. 
##It is possible to write to any Amazon data store (SQL Server, Redshift, etc) by using any previously defined connections.
retDatasink4 = glueContext.write_dynamic_frame.from_options(frame = dynamic_dframe, connection_type = "s3", connection_options = {"path": "s3://mybucket/outfiles"}, format = "csv", transformation_ctx = "datasink4")

glueJob.commit()

Once everything is prepared, the above script should output a csv file to your S3 bucket using the JSON file as a data source and the RSD file for the column mapping. Similar modifications can be made for any data source that may require a referenced file of any kind.


本記事に関するご意見、ご質問は弊社サポートチーム [email protected] までご連絡ください。