Just Another Data Science Blog...

Fun with USDA NASS Data - Part 3: Querying the NASS Data

This is part three in a series of blogs posts working with data from the USDA NASS database. It follows the second article, Part 2: Building Cloud Infrastructure. In this post, we'll be using JupyterLab to request data from the NASS database using their API. Since we've already established our credentials, set up our infrastructure, and deployed our working container, we're ready to retrieve data and send it to AWS for storage.

Before we can start requesting data, we need to import a number of modules that will help us make proper requests and format the returned information. Explaining each module is beyond the scope of this article but I'll point out that boto3 and requests are two of the most important. Requests helps us create http requests using Python and boto3 is AWS' Python software development kit. At the bottom of the next cell are environment variables imported from the new container. These variables will allow us to query the NASS database and upload our data to S3 at the end.

import ndjson
import requests
import random
import os
import time
import copy
from tqdm import tqdm_notebook as tn
import boto3.session
key = os.environ.get("USDAKEY")
bucket = os.environ.get("CROP_BUCKET")
profile = os.environ.get("AWS_LP")

Now that we have the modules and variables set, we can make a test GET request to the NASS database. I'm going to retrieve information about North Carolina. Specifically, I'll select records based on a few headers/columns including commodity_desc, group_desc, statisticcat_desc, unit_desc, and state_alpha. Columns can be found here. Be sure to pass in your API key by formatting the request string. Place a lone f at the beginning; it being the only character outside the string.

nc = (
    requests.get(
        f'''http://quickstats.nass.usda.gov/api/api_GET/?key={key}&
        group_desc=INCOME&commodity_desc=COMMODITY+TOTALS&
        statisticcat_desc=SALES&unit_desc=$&state_alpha=NC&format=json'''
    ).json()['data']
)

This request returns a list of 9627 records.

len(nc)
9627

By selecting the first record in the list, we can examine a sample of the data we received.

nc[0]
{'county_name': '',
 'util_practice_desc': 'ALL UTILIZATION PRACTICES',
 'domain_desc': 'OPERATORS',
 'end_code': '00',
 'state_ansi': '37',
 'Value': '6,612,983,000',
 'source_desc': 'CENSUS',
 'country_name': 'UNITED STATES',
 'state_alpha': 'NC',
 'group_desc': 'INCOME',
 'county_ansi': '',
 'class_desc': 'ALL CLASSES',
 'statisticcat_desc': 'SALES',
 'watershed_code': '00000000',
 'state_name': 'NORTH CAROLINA',
 'asd_desc': '',
 'region_desc': '',
 'reference_period_desc': 'YEAR',
 'week_ending': '',
 'county_code': '',
 'CV (%)': '4.3',
 'commodity_desc': 'COMMODITY TOTALS',
 'prodn_practice_desc': 'ALL PRODUCTION PRACTICES',
 'year': 2012,
 'load_time': '2012-12-31 00:00:00',
 'short_desc': 'COMMODITY TOTALS - SALES, MEASURED IN $',
 'asd_code': '',
 'congr_district_code': '',
 'sector_desc': 'DEMOGRAPHICS',
 'agg_level_desc': 'STATE',
 'location_desc': 'NORTH CAROLINA',
 'domaincat_desc': 'OPERATORS: (1 OPERATORS)',
 'begin_code': '00',
 'country_code': '9000',
 'unit_desc': '$',
 'state_fips_code': '37',
 'freq_desc': 'ANNUAL',
 'watershed_desc': '',
 'zip_5': ''}

As you can see, there is quite a bit of data in each record; including the fields we used in our selection critera. Let's grab records for all the states using the same critera. We'll first create a list of all state abbreviations in the ANSI format.

states = [
    'AL', 'AK', 'AZ', 'AR', 'CA', 'CO',
    'CT', 'DE', 'FL', 'GA', 'HI', 'ID',
    'IL', 'IN', 'IA', 'KS', 'KY', 'LA', 
    'ME', 'MD', 'MA', 'MI', 'MN', 'MS', 
    'MO', 'MT', 'NV', 'NE', 'NH', 'NJ', 
    'NM', 'NY', 'NC', 'ND', 'OH', 'OK',
    'OR', 'PA', 'RI', 'SC', 'SD', 'TN',
    'TX', 'UT', 'VT', 'VA', 'WA', 'WV',
    'WI', 'WY'
]

The next step is to define a function that will be called on to do the heavy lifting. We'll create a function called _stateinfo that will send a request containing the abbreviations of each state. It will randomly send each request 1 to 3 seconds apart to space out our requests and not overload the server. As the data is retrieved, the function will perform a minor edit on one of the keys returned. The keys will eventually map to column headers in AWS Athena. The key "CV (%)" is improperly formatted and Athena will throw an error, accordingly, when attempting to query an improperly formatted column header. We'll then write each data set to a file, name it after the corresponding state information it holds, and save it in our data folder. The files will be saved in new-line-delimited JSON format. It's great for AWS glue crawlers.

def state_info(state):
    
    fnames = {}
    
    for st in tn(state, desc='GET Requests'):
        s = (
            requests.get(
            f'''http://quickstats.nass.usda.gov/api/api_GET/?key={key}&
            group_desc=INCOME&commodity_desc=COMMODITY+TOTALS&
            statisticcat_desc=SALES&unit_desc=$&state_alpha={st}&format=json''')
            .json()['data']
        )
        for i in s:
            i['cv_per'] = i.pop("CV (%)")
        
        filename = f'state_{st}.ndjson'
        fnames[st] = (filename)
        with open(f'../data/{filename}', 'w') as filehandle:
            ndjson.dump(s, filehandle)
        
        time.sleep(random.randint(1, 3))
    
    return(fnames)
si = state_info(states)

Note: Because we're randomizing the time between each request, the length of time it will take to retrieve all state records can vary.

After calling the state_info function, the si variable now maps to the fnames dictionary that holds keys consisting of state abbreviations ('NC', 'TZ', etc...) and values consisting of the names of their corresponding ndjson files (state_NC.ndjson).

Now, pieceing it all together, we can upload our files to AWS S3. We'll call the s3 resource object using the boto3 module and then stream all the data into a bucket and partition the information by state abbreviation. Partitioning will cut down the amount of data scanned at query time saving both, time and money. When we make a request to put the data into our bucket we receive a JSON object response. A 200 status code response means our request was accepted and is 'OK'. Sense we're making a request for each state, we'll send and receive a total of 50 requests and responses. Below, the _upload_data_ function will do the work of sending requests and handling the responses. If all goes well, we'll receive a single 200 status code from the function.

s3 = (
    boto3.session.Session(profile_name=profile)
    .resource('s3')
)
def upload_data(records):
    
    all_rs = []
    for k, v in tn(records.items(), desc='PUT Requests'):
        file = v
        with open(f'../data/{file}', 'r') as f:
            byt = bytes(ndjson.dumps(ndjson.loads(f.read())).encode('utf-8'))
        obj = s3.Object(bucket, f'crop-data/partition_id={k}/{file}')
        resp = obj.put(Body=byt)
        all_rs.append(resp['ResponseMetadata']['HTTPStatusCode'])
        time.sleep(2)
    return(set(all_rs))
upload_data(si)

{200}

After receiving a 200 response, we can check our S3 bucket to ensure all data and partitions are present.

Well, after all that, we are FINALLY ready to do some data analysis. It's not easy wrangling data, but it's rewarding when it's finished because you learn a lot in the prcoess. You also learn useful coding skills that can be applied when doing an analysis. Think about it, to get to this point we had to learn about AWS resources including: Cloudformation, SAM, S3 and AWS Glue. In this notebook, we're using python to wrangle data from a third party API in the USDA. That's a lot of work just to get the data into a format you can query. Exciting stuff. The final blog post will entail exploring the data we've wrangled here.