Showing posts with label Snowflake. Show all posts
Showing posts with label Snowflake. Show all posts

Thursday, November 28, 2024

Snowflake's REDUCE Function to Glean Insights from SEC Filing Data

 Summary

  • What is the REDUCE Higher-order Function?
  • Learn the JSON structure of the SEC company filing from an example.
  • What fiscal end periods are represented in the JSON document?
  • Answer the question using the REDUCE higher-order function in Snowflake.

What is the REDUCE Higher-order Function?

Recently, Snowflake made REDUCE Higher-order function generally available. This function adds another powerful, easy-to-use tool to your toolkit to process arrays. The REDUCE function allows you to accumulate values across an array into a single value. It takes an array as input, an initial accumulator value, and a Lambda expression that defines the logic for processing each array element.

REDUCE( <array> , <init> , <lambda_expression> ) 

The JSON Structure of the SEC Filing

My goal is to understand the cash carried by Kimberly-Clark Corporation in its balance sheet. The company is known for its products, such as Huggies and Cottonelle. I want to list all the fiscal end dates in the data. There can be inconsistencies in the data filed with the SEC, especially concerning the fiscal periods represented, so knowing what fiscal periods are in the data can be invaluable. Also, the SEC filing may have repeated data. This is because investors wish to compare current results with past results, so a Q2 report should include Q1 and Q2 data from the previous year. So, the SEC filing would have repetitions.

Note: You can learn about my External Table structure here. In my LinkedIn profile, you can read a series of blogs about my setup to query SEC filings.

Here's the JSON structure we will use in the REDUCE function:

{ 
"cik": 55785, 
"description": "Amount of currency on hand as well 
as demand deposits with banks or financial institutions. 
Includes other kinds of accounts that have the general characteristics of 
demand deposits.", 
"entityName": "KIMBERLY-CLARK CORPORATION", 
"label": "Cash and Cash Equivalents, at Carrying Value", 
"tag": "CashAndCashEquivalentsAtCarryingValue", 
"taxonomy": "us-gaap", 
"units":  { 
"USD": [
                  { 
                    "accn": "0001193125-10-038621", 
                    "end": "2006-12-31", 
                    "filed": "2010-02-24", 
                    "form": "10-K", 
                    "fp": "FY", 
                    "frame": "CY2006Q4I", 
                    "fy": 2009, 
                    "val": 361000000 
                  }, 
                  {
                    "accn": "0000055785-09-000026", 
                    "end": "2007-12-31", 
                    "filed": "2009-08-07", 
                    "form": "10-Q", 
                    "fp": "Q2", 
                    "fy": 2009, 
                    "val": 473000000
                  }
               ]
                }
}

What fiscal end periods are represented in the JSON document?

I have created an external table called CONS_STAPLES_CASH_AND_CASH_EQUIVALENTS. For the REDUCE function, the input is the path to the array:

Path to the Array Elements:

VALUE:"units":"USD" 

I wish to get a concatenated string of all the fiscal end periods represented in the JSON document. This is represented by the "end" key. This is represented in the init parameter as ''. Finally, in the Lambda Expression, the arg1 argument is the accumulator, and the arg2 argument is the current element being processed in the array.

Lambda Expression and the Query:

(arg1, arg2) -> arg1 || ' ' || arg2:"end" || ', '
SELECT 
            TICKER_SYMBOL, 
            VALUE, 
            REDUCE(VALUE:"units":"USD", '',  (arg1, arg2) -> arg1 || ' ' || arg2:"end" || ', ')           
                                                                                             FISCAL_PERIOD_END_DATES 
FROM 
            CONS_STAPLES_CASH_AND_CASH_EQUIVALENTS_ET;

When I execute the query, the REDUCE function retrieves the value for the "end" key, concatenates it to the accumulator, and returns it (Exhibit 1). The screenshot shows that for Kimberly-Clark Corp (KMB), the JSON has data from the fiscal period ending 2006-12-31. But for Target (TGT), the data in the JSON is from 2016-01-30.

Exhibit 1: The Fiscal Period End Date Returned by the REDUCE function.

Snowflake Snowsight

I can also tell that the SEC filing has duplicate data that I must handle in my query. For example, I can see that the 2007-12-31 is represented multiple times in the file I downloaded from the SEC (Exhibit 2).

Exhibit 2: Fiscal Period End Dates Accumulated By the REDUCE Higher-order Function.

SEC.GOV

I can quickly see the data in my JSON files downloaded from the SEC. I did not have to use a LATERAL FLATTEN to get at the data. The REDUCE function boosts my efficiency when I am dealing with JSON data. Try out Snowflake's REDUCE and other Higher-order functions; they will make you more productive.

Saturday, November 23, 2024

Purchase Price Matters: Implementing Volume-Weighted Average Price (VWAP) in Snowflake

 Summary:

  • A brief description of VWAP and its importance in trading and asset management.

  • The process to calculate VWAP.

  • An overview of the Snowflake features used to implement VWAP.

  • The architecture of VWAP implementation in Snowflake.

  • Code examples

  • Examples of charting Microsoft's and Nestle's VWAP in Python.

What is VWAP?

Volume-weighted Average Price (VWAP) is a price signal that takes into account the trading volume. The logic behind the VWAP is simple: if investors think an asset is undervalued compared to its current price, they will purchase more of that asset. Investors use the VWAP as a benchmark price to make buying or selling decisions. If an asset is currently trading above the VWAP for the day, the trader may decide to sell or short an asset with the expectation that the asset would revert to the VWAP line, giving the trader a handsome profit. A trader may consider taking a long position if the asset's current price is below the VWAP.

A portfolio manager looking to acquire assets for her fund may use VWAP as the price to beat - a purchase price at or below VWAP would be considered reasonable. The portfolio manager would feel happy that she did not overpay for an asset.

Purchase Price Matters

The title of this article—Purchase Price Matters—comes from the excellent interview conducted by Nicolai Tangen (CEO of Norges Bank Investment Management) of Marc Rowan (CEO of Apollo Global Management, Inc.). Marc uses this phrase to state that every investment is a value investment. If you overpay for an asset, your investment returns will be lower - a simple yet profound thought. You can listen to the interview in the podcast - In Good Company With Nicolai Tangen. There are many amazing interviews in this podcast. These are three other episodes I would highly recommend:

VWAP Calculation

Investment firms may have their proprietary methodology for calculating VWAP. To implement VWAP in Snowflake, I have followed the method outlined in Investopedia, considering data availability and simplicity.

Here are the steps in this method:

  • Take the average of high, low, and close prices for each period.

  • If your VWAP period is 5 minutes, you will take the average of the high, low, and close for this period during the trading day. You arrive at the Typical Price for the asset.

  Typical Price (TP) = (High + Low + Close) / 3
  • In my calculation, I only used the closing price for a period as my Typical Price.

  • Next, multiply the Typical Price by the Trading Volume in this period.

Typical Price Volume (TPV) = Typical Price * Volume
  • The VWAP is calculated by dividing the Typical Price Volume by the Volume. In this case, the VWAP for the first 5-minute time period would equal the Typical Price.

VWAP = Typical Price Volume / Volume
  • The final step is to calculate the cumulative TPV over a period of time (for e.g. 60 minutes, a day, several days, or a year) and divide it by the sum of volume over the same period.

Cumulative VWAP = (TPV 1st 5-Min) + TPV 2nd 5-Min)/SUM(Vol 1st 5-Min + Vol 2nd 5-Min)

Data Provider

I used Polygon.IO as the data provider. In its free tier, Polygon provides a trade aggregates API that aggregates trades over 1-minute, 5-minutes, hours, days, weeks or months. I have used this data to demonstrate the VWAP implementation in Snowflake. The aggregates data is in the format:

{
  "adjusted": true,
  "next_url": "https://api.polygon.io/v2/aggs/ticker/AAPL/range/1/day/1578114000000/2020-01-10?cursor=bGltaXQ9MiZzb3J0PWFzYw",
  "queryCount": 1,
  "request_id": "6a7e466379af0a71039d60cc78e72282",
  "results": [
    {
      "c": 75.0875,
      "h": 75.15,
      "l": 73.7975,
      "n": 1,
      "o": 74.06,
      "t": 1577941200000,
      "v": 135647456,
      "vw": 74.6099
    },
  ],
  "resultsCount": 1,
  "status": "OK",
  "ticker": "AAPL"
}

VWAP Implementation on Snowflake

Before we get into the architecture, let's introduce some of the Snowflake features used to implement VWAP:

  • Snowflake Storage Integration stores the identity and access information for the AWS S3 Bucket.

  • A Snowflake Stage object identifies the location where the files are stored.

  • Snowflake Snowpipe enables loading of data from files in batches. One can use a COPY statement in Snowpipe to automate the loading of file. An AWS S3 Bucket can be configured to notify Snowpipe of available files to load into Snowflake using AWS Simple Queue Service (SQS).

  • The COPY INTO <table> SQL statement helps load data from files into an existing table.

  • Snowflake Dynamic Tables offers a simple way to automate the transformation of data. You can easily create data pipelines using Dynamic Tables.

  • The TIME_SLICE SQL function calculates the beginning or end of a "slice" of time.

  • The Window functions are used to aggregate data over a period of time. I use this to calculate the cumulative VWAP. The Window functions are used to aggregate over a group of related rows, known as a partition. In our case the partion is the TICKER_SYMBOL - MSFT, AAPL, PEP, etc.

  • Snowflake Pandas API allows you to run Pandas code in a distributed manner.

This is the architecture (Exhibit 1):

Exhibit 1: Volume-Weighted Average Price Architecture.



VWAP Implementation on Snowflake
  • I use a Python app to access the Polygon API and store the JSON output in an AWS S3 Bucket.

  • The AWS S3 Bucket is configured to notify Snowflake Snowpipe when a file lands in the Bucket.

  • When Snowpipe receives the notification, its picks up the file from the Bucket and loads the raw JSON data into a table in Snowflake.

  • At this point, a Dynamic Table, PARSE_STOCK_TRADES_DT, starts the process of transforming the JSON data by parsing the various keys.

  • Another Dynamic Table, STOCK_TRADES_INTERMEDIATE_VWAP_DT, calculates the VWAP for various stocks over 20-minute time slices. In short, we take the 1-minute aggregate data from Polygon and calculate the VWAP for 20-minute slices.

  • Finally, the last Dynamic Table, VWAP_STOCK_TRADES_DT, calculates the cumulative VWAP using a Window function to aggregate the price and volume data over all the previous rows and the current row.

  • The final VWAP from the VWAP_STOCK_TRADES_DT can be presented in a dashboard as a chart.

The Snowflake Code Samples

Creating a Storage Integration

CREATE STORAGE INTEGRATION companystockprices_storage_int 
TYPE = EXTERNAL_STAGE STORAGE_PROVIDER = 'S3' 
ENABLED = TRUE 
STORAGE_AWS_ROLE_ARN = '<AWS  IAM Role ARN>' STORAGE_ALLOWED_LOCATIONS = ('*');

Create a Stage Object

CREATE STAGE COMPANY_STOCK_TRADES_STG 
URL = '<AWS S3 Bucket Path>' 
STORAGE_INTEGRATION = companystockprices_storage_int;

Create a Table to Store the JSON Trade Data

CREATE OR REPLACE TRANSIENT TABLE COMPANY_STOCK_TRADES_RAW 
( 
TICKER VARIANT, 
RESULTS VARIANT
);

Create a Snowpipe

CREATE OR REPLACE PIPE COMPANY_STOCK_TRADES_PIPE 
AUTO_INGEST = TRUE 
AS 
COPY INTO DEMODB.EQUITY_RESEARCH.COMPANY_STOCK_TRADES_RAW 
FROM @DEMODB.EQUITY_RESEARCH.COMPANY_STOCK_TRADES_STG 
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE 
FILE_FORMAT = (type = 'JSON' STRIP_OUTER_ARRAY = TRUE);

Create a Dynamic Table to Store the Parsed Data from the Raw Table

CREATE OR REPLACE TRANSIENT DYNAMIC TABLE PARSE_STOCK_TRADES_DT 
( 
TICKER_SYMBOL   VARCHAR, 
TRADE_TIME           TIMESTAMP_NTZ, 
TRADE_PRICE         NUMBER(20, 4), 
TRADE_VOLUME    NUMBER
) 
TARGET_LAG = DOWNSTREAM 
WAREHOUSE = DEMO_XSMALL_WH 
REFRESH_MODE = INCREMENTAL 
AS 
SELECT 
TICKER::VARCHAR TICKER_SYMBOL, 
TO_TIMESTAMP_NTZ(TO_NUMBER(trades.VALUE:"t"),3) TRADE_TIME, 
TO_NUMBER(trades.VALUE:"c",14, 4) 
TRADE_PRICE, TO_NUMBER(trades.VALUE:"v") TRADE_VOLUME 
FROM 
COMPANY_STOCK_TRADES_RAW CSTR, 
LATERAL FLATTEN (input => CSTR.RESULTS) TRADES 
ORDER BY TICKER_SYMBOL, TRADE_TIME;

Create a Dynamic Table to Calculate the Intermediate VWAP

CREATE OR REPLACE TRANSIENT DYNAMIC TABLE INTERMEDIATE_VWAP_STOCK_TRADES_DT 
( 
TRADE_TIME_SLICE                TIMESTAMP_NTZ, 
TICKER_SYMBOL                   VARCHAR, 
SUM_PRICE                       NUMBER(20, 4), 
SUM_VOLUME                      NUMBER, 
INTERMEDIATE_SUM_PRICE_VOLUME   NUMBER(20, 4), 
INTERMEDIATE_VWAP               NUMBER(20, 4)
) 
TARGET_LAG = DOWNSTREAM 
WAREHOUSE = DEMO_XSMALL_WH 
REFRESH_MODE = INCREMENTAL 
AS 
SELECT 
TIME_SLICE(TRADE_TIME, 20, 'MINUTE') TRADE_TIME_SLICE, 

SUM(TRADE_PRICE)    SUM_PRICE,

SUM(TRADE_VOLUME)   SUM_VOLUME, 

SUM(TRADE_PRICE * TRADE_VOLUME) INTERMEDIATE_SUM_PRICE_VOLUME,
 
SUM(TRADE_PRICE * TRADE_VOLUME)/SUM(TRADE_VOLUME)  INTERMEDIATE_VWAP 

FROM  
PARSE_STOCK_TRADES_DT 
GROUP BY TICKER_SYMBOL, TRADE_TIME_SLICE
ORDER BY TICKER_SYMBOL, TRADE_TIME_SLICE; 

Create a Dynamic Table to Calculate the Cumulative VWAP

CREATE OR REPLACE TRANSIENT DYNAMIC TABLE VWAP_STOCK_TRADES_DT 
( 
TRADE_TIME_SLICE                    TIMESTAMP_NTZ, 
TICKER_SYMBOL                       VARCHAR, 
TICKER_SYMBOL_TRADE_TIME_SLICE      VARCHAR,  
CUMULATIVE_PRICE                    NUMBER(20,4), 
CUMULATIVE_VOLUME                   NUMBER, 
FINAL_VWAP                          NUMBER(20,4)
) 
TARGET_LAG = '30 minutes' 
WAREHOUSE = DEMO_XSMALL_WH 
REFRESH_MODE = INCREMENTAL 
AS 
SELECT 

TRADE_TIME_SLICE, 
TICKER_SYMBOL, 

(SUM(SUM_PRICE) OVER  (PARTITION BY TICKER_SYMBOL ORDER BY TRADE_TIME_SLICE ASC ROWS 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)) CUMULATIVE_PRICE, 

(SUM(SUM_VOLUME) OVER (PARTITION BY TICKER_SYMBOL ORDER BY TRADE_TIME_SLICE ASC ROWS 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)) CUMULATIVE_VOLUME,

(SUM(INTERMEDIATE_SUM_PRICE_VOLUME) OVER (PARTITION BY TICKER_SYMBOL 
ORDER BY TRADE_TIME_SLICE ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW))
/(SUM(SUM_VOLUME) OVER (PARTITION BY TICKER_SYMBOL ORDER BY TRADE_TIME_SLICE ASC ROWS 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)) FINAL_VWAP 

FROM 
INTERMEDIATE_VWAP_STOCK_TRADES_DT 

ORDER BY TICKER_SYMBOL, TRADE_TIME_SLICE ASC;

You can visualize the data pipeline in Snowflake Snowsight (Exhibit 2). The active Dynamic Tables are shown with the dark blue arrows.

Exhibit 2: Data Pipeline Graph in Snowsight.



Data Pipeline Visualized in Snowsight

Charting Intermediate and Cumulative VWAP in Python

In Snowflake Notebook, you can easily create a session object using get_active_session()

import streamlit as st 
import matplotlib.pyplot as plt 
import seaborn as sns
# Snowpark Pandas API. 
# We are loading all the data in the Snowflake Pandas Data Frame.

import modin.pandas as spd 

# Import the Snowpark pandas plugin for modin 
import snowflake.snowpark.modin.plugin 

from snowflake.snowpark.context import get_active_session
 # Create a snowpark session 
session = get_active_session()
# Name of the sample database and the schema to be used 

SOURCE_DATA_PATH = "DEMODB.EQUITY_RESEARCH" 

Query the intermediate VWAP values from the INTERMEDIATE_VWAP_STOCK_TRADES_DT

# Query the Intermediate VWAP Dynamic Table.

intermediate_VWAP_df = spd.read_snowflake(f"{SOURCE_DATA_PATH}
                .INTERMEDIATE_VWAP_STOCK_TRADES_DT")
                .sort_values(["TICKER_SYMBOL","TRADE_TIME_SLICE"], ascending = True) 

# Filter for the MSFT Values in the Pandas Data Frame.

filtered_intermediate_VWAP_df = intermediate_VWAP_df
                        .where(intermediate_VWAP_df['TICKER_SYMBOL'] == 'MSFT') 

# Remove all the NONE values from the Pandas Data Frame.

filtered_intermediate_VWAP_df = filtered_intermediate_VWAP_df.dropna()

Query the cumulative VWAP from the VWAP_STOCK_TRADES_DT

# Query the Cumulative VWAP Table 

final_VWAP_df = spd.read_snowflake(f"{SOURCE_DATA_PATH}.VWAP_STOCK_TRADES_DT")

# Filter for the MSFT values in the Pandas Data Frame.

filtered_final_VWAP_df = final_VWAP_df.where(final_VWAP_df['TICKER_SYMBOL'] == 'MSFT') 

# Remove all the NONE values from the Pandas Data Frame.

filtered_final_VWAP_df = filtered_final_VWAP_df.dropna()

Merge the intermediate VWAP and the Cumulative VWAP to use in a Python chart.

# Merge the Intermediate VWAP and Cumulative VWAP

spd_intermediate_and_final_vwap_df = filtered_intermediate_VWAP_df.merge(filtered_final_VWAP_df,
                                            left_on='TICKER_SYMBOL_TRADE_TIME_SLICE', 
                                            right_on='TICKER_SYMBOL_TRADE_TIME_SLICE', 
                                            how='left')

Use the merged the Snowflake Pandas Data Frame to plot the chart.

data = {  
'TRADE_TIME_SLICE_x': spd_intermediate_and_final_vwap_df['TRADE_TIME_SLICE_x'], 

'INTERMEDIATE_VWAP': spd_intermediate_and_final_vwap_df['INTERMEDIATE_VWAP'], 

'FINALVWAP': spd_intermediate_and_final_vwap_df['FINAL_VWAP']
} 

df = spd.DataFrame(data)
 
# Create the plot 
plt.figure(figsize=(15, 6)) 

plt.plot(df['TRADE_TIME_SLICE_x'], df['INTERMEDIATE_VWAP'], label='INTERMEDIATE_VWAP') 

plt.plot(df['TRADE_TIME_SLICE_x'], df['FINAL_VWAP'], label='FINAL_VWAP') 

# Add title, labels, and legend 
plt.title('Microsoft Volume Weighted Average Price (VWAP)') 

plt.xlabel('TRADE') plt.ylabel('VWAP') plt.legend() # Show the plot plt.show()

I merged the intermediate VWAP (20-Minute Time Slice) and the cumulative VWAP (From Feb 2023) and plotted it in a chart in Snowflake Notebook using Python. Here's how it looks:

We can see from the chart (Exhibit 3) that Micrsoft is currently trading (Intermediate VWAP - blue line) well above its cumulative VWAP line (yellow line). Microsoft is benefitting from the AI-led demand for its products and services mixed with the euphoria and promise of more gains to come from AI-related product releases.

Exhibit 3: Microsoft's Intermediate VWAP (20-Minute Window) and Cumulative VWAP (Since Feb 2023)



Microsoft's Intermediate VWAP (20-Minute Window) and Cumulative VWAP (Since Feb 2023)

On the other end of the spectrum is Nestle. The company is having a no good, very bad year since October 2023. Its intermediate VWAP (20-Minute Time Slice) has dropped well below its cumulative VWAP (Exhibit 4). Nestle has suffered from sky rocketing prices for cocoa and coffee that has pressured its margins. This is Nestle, one of the most iconic brands with a multitude of well-known products and sales in over a 100 countries, it's in pretty bad shape today, but they should recover in the coming years.

Exhibit 4: Nestle Intermediate VWAP (20-Minute Time Slice) and Cumulative VWAP (Since Feb 2023).



Nestle Intermediate VWAP (20-Minute Time Slice) and Cumulative VWAP (Since Feb 2023).

In just a couple of hours you can ingest raw market data into Snowflake and transform it into signals such as Volume-Weighted Average Price (VWAP).

How Much Does Coca-Cola Spend on Advertising?

Coke's AI Generated Ad (Source: WSJ.com) Ads are meant to evoke a reaction, an emotion, and an action. Great ads can bring you t...