FinOps and shifting left your cloud finances

The old vs the new model of infrastructure finance

The old procurement model :

In the old days of infrastructure, ops teams like any other organization would have to follow a traditional procurement model, requirements for hardware are set months or years in advance, and needed to account for growth, the process was long, subject to multiple budget approvals and delays, OPex and CAPex finances have to determined and also approved.

The new cloud model

In the era of cloud, operation teams have more direct access to cloud resources, and cloud spending, without having to undergo a budget approval process, with this new flexibility in provisioning, the challenge becomes how to have a good governance model around cloud spending without been a road block for development and operation teams, but at the same time making sure the finance teams have a realtime visibility into the cloud bill cloud bill

what is finops :

FinOps is a culture and practice of financial awareness and transparency around cloud spending. Teams should use tools to understand their spend, forecast future spend, and take action to manage that spend as needed. FinOps is about using data to drive better decisions. It’s about efficiently using resources for the best return on investment. FinOps is about understanding how your organization is using cloud resources, and using that information to make strategic decisions about how to use your cloud spend more effectively.

FinOps Personas

different people or personas are involved in the finops teams, each has different role to accomplish, but it all centered around cloud, provisioning,

FinOps Practitioner : Drive best practices into the organization through education, standardization, and cheerleading

Execs : Leverage technology to give the business a market and competitive advantage

Product Owner : Quickly bring new products and features to market with an accurate price point

Finance : Accurately budget, forecast and report cloud costs

Engineering : Deliver faster and high quality services to the organisation, whilst maintaining business as usual

The Prius Effect

 J.R. Storment and Mike Fuller in their book Cloud FinOps talk about the Prius Effect ; it’s basically an analogy to driving the hybrid EV, when you put your foot and the pedal your can instantly see the flow of energy from the battery to the engine, when you lift your foot the flow goes the other way, and the feedback is instantaneous .

The real-time reporting on cloud spends should follow the same model, providing transparency to everyone to see the effect of cloud provisioning, this is a capability that needs to be built and offered internally, making use of all the cloud billing resources, cost estimators, etc …

Unit Economics

Unit economics is the idea that cloud spend should be measured against a business metrics, it can be the number of order delivered per day, the cost of keeping a number of DAU ( daily active users), revenue per ride for a ride sharing company, etc

This approach will change the narrative of cloud spends, from just reducing the cloud bill, to correlating that amount with a business goal, this help organizations distinguish between good cloud spend and bad.

Shifting left cloud finances with engineering and operation teams :

Just like Devops and DevSecOps allowed us to shift left application delivery and application security, by getting developers and operations teams to work closely to release faster, and embed application security practices in the pipelines, FinOps aims also to eliminate the friction between ops teams and financial team, one aspect of of this collaboration can be done at the code level:

Imagine you have an operation team, wanting to provision additional EC2 instances, the ops team writes a terraform template, and sends a pull request to your code repository

your code request can generate a report showing the forecasted expenses for such a change, tools like infracost , can allow you to create a report on the impact of the current infrastructure and the future one, in a way that will help with the decision making process, this , product managers can take a look at the report and decide to approve or deny the pull request.

This step while its scope is only limited to the engineering teams, it helps create more transparency about cloud expenses, and also have operation teams take better ownership of their cloud bill.

My Thoughts :

FinOps will have to become a culture to adopt by every organization using cloud resources, driven by transparency and ownership of cloud expenses, but also managing cloud investments, at the end of the day FinOps is not only about saving money it’s also about making money, when more resources are needed for new projects.

there is a lot of talks, blogs, webinars, about cloud cost optimization but often times it’s only targeted at engineering and operation teams, using very specific technical nomenclature making almost impossible for non-technical stakeholders to understand it.

J.R Storment and Mike Fuller recommend having an actual FinOps team, composed of the different personas, having access to real time reporting, and an understandable language and terminology that make sense for everyone, while keeping in mind that one of the driving factors of cloud expendenture is Unit economics

Making your publisher/subscriber app more cloud agnostic using DAPR

Publisher/Subscriber Model

Publish/subscribe messaging, or pub/sub messaging, is a form of asynchronous service-to-service communication used in serverless and microservices architectures. In a pub/sub model, any message published to a topic is immediately received by all of the subscribers to the topic. Pub/sub messaging can be used to enable event-driven architectures, or to decouple applications in order to increase performance, reliability and scalability.

Depending on which pub/sub technology you end up choosing, your application code will be tightly coupled the technology you choose, which makes it difficult to switch from one provider to another without changing the code of your application

DAPR ( Distributed APplication Runtime) give your the possibility to abstract your pub/sub layer and write your code independently of your pub/sub provider ( SQS, Azure Bus Service, Kafka, Azure event hub, rabbitmq, redis streams )

Example :

In this example we will have an order processing app where the publisher pushers orders to the messaging queue

Application Layer : Publisher and Subscriber both python programs

DAPR pubsub Component : we will use Azure Service Bus

1- creating Azure Service Bus :

for more details about how to create an Azure Service Bus, you can read this article

2- Install Dapr

curl -fsSL https://raw.githubusercontent.com/dapr/cli/master/install/install.sh | /bin/bash

3- initialize dapr

in a terminal window run : dapr init

4- configure dapr publisher component

create a yaml file called pubsub.yaml and place it under ~/.dapr/components

5- Start Dapr Application

dapr run --app-id daprapptest --dapr-http-port 3601

6- Run publisher application

save the following app code to a publisher.py file

Starting the publisher app :

dapr run --app-id daprapptest --app-port 6001 --dapr-http-port 3601 --app-protocol grpc python3 publisher.py

The application start publishing messages to the Azure Service Bus

My Thoughts:

Modern microservices cloud-native applications need to be able to agnostic of the underlying technologies in order to achieve more independence and ease of deployment across platforms, in the example above we tied to make a publisher application code more independent of the technology, we used Azure Service Bus as a queue technology, but we could easily change it AWS SQS, Rabbit MQ , or any other platform that DAPR supports without having to make any changes to application code

Decentralized cloud is not ready for prime time … But it’s here to stay.

The whole concept of decentralized cloud was always interesting to me, after all, it’s fair to say today that the world of cloud is monopolized by the 3 big players: AWS, Azure, GCP, so a new approach to compute/storage based on blockchain technology is intriguing , and would make the cloud a more competitive destination rather than an exclusive big players club.

Another big driver for decentralization is the rapid expansion of web3 and DeFi apps, or DAOs ( decentralized Autonomous Organizations ), the growing interest from the public to have a more secure management of their personal data, and the skepticism from the control that “Big Tech” has over individuals information.

Because of its centralized nature the cloud as we know it today has its frailties, in the last 4 years AWS has had significant outages and impairments , which has resulted in entire internet services being interrupted from fortune500 companies to digital personal assistants and smart cat litters being offline.

Centralized vs Decentralized apps

Decentralized apps (dApps) is the new model of deploying/running apps designed for web3, unlike cloud native apps, that run in a data center, can be restricted geographically to a region/availability zone, but also logically deployed on a single cloud provider, dApps don’t have these constraints.

DApps have their backend code (smart contracts) running on a decentralized network and not a centralized server. They use blockchain for data storage and smart contracts for their app logic.

A smart contract is like a set of rules that live on-chain for all to see and run exactly according to those rules. Imagine a vending machine: if you supply it with enough funds and the right selection, you’ll get the item you want. And like vending machines, smart contracts can hold funds . This allows code to mediate agreements and transactions.

the idea behind decentralized cloud, is to leverage the underutilized cpu/storage capacity by being part of blockchain solution like : Ankr, Solana, or Akash , but in order to offer a viable alternative to the traditional centralized cloud a high degree of service availability, easy integration, and secure communication will be required.

Just like compute, storage, DNS, and database also have blockchain solutions like :

Storj : https://storj.io

Storj DCS is the world’s first open-source, decentralized cloud storage layer that’s private by design and secure by default – enabling developers to build in the best data protection and privacy into their applications as possible. The zero trust architecture, multi-region high availability, default encryption and edge-based access controls minimize risk and give only you, or those you grant permission to, access to your files. The result is that you take back full ownership and control of your data.

Hanshake : https://hsd-dev.org

Handshake is a UTXO-based blockchain protocol which manages the registration, renewal and transfer of DNS top-level domains (TLDs). Our naming protocol differs from its predecessors in that it has no concept of namespacing or subdomains at the consensus layer. Its purpose is not to replace DNS, but to replace the root zone file and the root servers.

BigchainDB: https://www.bigchaindb.com

BigchainDB allows developers and enterprise to deploy blockchain proof-of-concepts, platforms and applications with a BigchainDB is a blockchain database offering decentralization, immutability and native assets. BigchainDB allows for the deployment of large-scale applications in a variety of use cases and industries from intellectual property and identity to supply chain, and Internet-of-Things

My first Decentralized cloud app : Akash Networks

I looked at different platforms to POC an app deployment, I decided to give Akash Networks a try.

Akash is an open source Cloud platform that lets you quickly deploy a Docker container to the Cloud provider of your choice.

The Akash Marketplace is where users lease computing resources from Cloud providers before deploying a Docker container on the Akash Container Platform. The marketplace stores on-chain records of requests, bids, leases, and settlement payments using the Akash Token (AKT).

how to deploy an app on Akash Cloud :

using the their deployment platform Akashlytics Deploy

  • Define your Docker image, CPU, Memory, and Storage in a deploy.yaml file.
  • Set your price, receive bids from providers in seconds, and select the lowest price.
  • Deploy your application without having to set up, configure, or manage servers.
  • Scale your application from a single container to hundreds of deployments.

Akashlytics is a deployment platform that let you interact with the marketplace of datacenters willing to host the application, it will allow you to :

1 – Create the manifest file

2 – Placing the Bid

3 – Accept the offer



4- Deploy the app and create the lease

My thoughts :

I think it’s important to separate the hype from reality when it comes to the present of decentralized cloud, or dApps, while I think the idea of having an infrastructure that’s relies on other platforms than the big cloud players ( AWS/Azure/GPC ) is not only promising, but necessary, we should acknowledge that it’s in its early stages, lacking the stability, the reliability, and governance that both big and small businesses need.

One of the apparent advantage of using underutilized compute and storage across the glob is definitely driving cost down, Akash cloud for example is advertising prices that are 80% lower than those of public clouds :

having low compute prices can be a good incentive for adoption, it can be a good place for startups and companies with small cloud footprint/budget to start

Transparency is another aspect of decentralized cloud as you pay upfront ( with your digital wallet ) in coins for your monthly usage, you can avoid the “surprise fees” that you tend to find in your public cloud bill.

Finally, I’d say i’m excited that the idea of demonopolizing the cloud has started to gain momentum, let’s keep it going !

PART 1 : Continuously Loading Synthea Patients Data to Azure FHIR API

What is FHIR ?

Fast Healthcare Interoperability Resources (FHIR, pronounced “fire”) is a standard describing data formats and elements (known as “resources”) and an application programming interface (API) for exchanging electronic health records (EHR). The standard was created by the Health Level Seven International (HL7) health-care standards organization.

FHIR is organized by resources (e.g., patient, observation).Such resources can be specified further by defining FHIR profiles (for example, binding to a specific terminology). A collection of profiles can be published as an implementation guide (IG), such as The U.S. Core Data for Interoperability

Because FHIR is implemented on top of the HTTPS (HTTP Secure) protocol, FHIR resources can be retrieved and parsed by analytics platforms for real-time data gathering. In this concept, healthcare organizations would be able to gather real-time data from specified resource models. FHIR resources can be streamed to a data store where they can be correlated with other informatics data. Potential use cases include epidemic tracking, prescription drug fraud, adverse drug interaction warnings, and the reduction of emergency room wait times.

Architecture

Part 1

Here we will try ti use Github Actions to automate the ingestion from Synthea using Azure DevOps

Part 2: in a future blog post we will look more into the data and do some ML on data

Synthea

github project : https://github.com/synthetichealth/synthea

Synthea is a Synthetic Patient Population Simulator. The goal is to output synthetic, realistic (but not real), patient data and associated health records in a variety of formats.

Currently, Synthea features include:

  • Birth to Death Lifecycle
  • Configuration-based statistics and demographics (defaults with Massachusetts Census data)
  • Modular Rule System
    • Drop in Generic Modules
    • Custom Java rules modules for additional capabilities
  • Primary Care Encounters, Emergency Room Encounters, and Symptom-Driven Encounters
  • Conditions, Allergies, Medications, Vaccinations, Observations/Vitals, Labs, Procedures, CarePlans
  • Formats
    • HL7 FHIR (STU3 v3.0.1, DSTU2 v1.0.2 and R4)
    • Bulk FHIR in ndjson format (set exporter.fhir.bulk_data = true to activate)
    • C-CDA (set exporter.ccda.export = true to activate)
    • CSV (set exporter.csv.export = true to activate)
    • CPCDS (set exporter.cpcds.export = true to activate)
  • Rendering Rules and Disease Modules with Graphviz

Azure FHIR API

Azure Healthcare APIs provides pipelines that help you manage protected health information (PHI) data at scale. Rapidly exchange data and run new applications with APIs for health data standards including Fast Healthcare Interoperability Resources (FHIR) and Digital Imaging Communications in Medicine (DICOM). Ingest, standardize, and transform data with easy-to-deploy tools and connectors for device and unstructured data. Expand discovery of insights by connecting to tools for visualizations, machine learning (ML), and AI.

Setup Azure FHIR API :

In azure portal : search for

FhirLoader :

I used a FHIR loader developed by Michael Hansen

it’s a dotnet application that can be run using client ID and secret ID :

dotnet run -- --client-secret "XXXX" --client-id "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" --input-folder ..\synthea\output\fhir\ --authority "https://login.microsoftonline.com/{tenant-id}" --fhir-server-url "https://{myfhirserver}.azurehealthcareapis.com" --max-degree-of-parallelism 14

Github Actions :

In order to automate the process of ingestion of patients data to Azure FHIR API, we will use a GitHub action file :

for all the credentials used for authentication I used Github Secrets :

After running the job :

Synthea created this patient data :

To check on the Azure FHIR API side , I use postman

In Next blog we will see how load data to Azure databricks and work on training and deploying models.

Deploy Machine Learning at scale with Kedro and Cortex.dev

ML model deployment in production is still an area that lacks conformity, nomenclature, and patterns. Aside from few technology companies who started the journey early on, for late adopters of ML practices, it’s pretty much the wild west when it comes to the standards of model deployment.

Some other challenges include the culture of data science teams inside the organization, productionizing the process of model release, data scientists by nature comes from an academic and research background, and tend to focus more on perfecting the quality of predictions and classifications by running different experiments, tracking them, and lowering cost functions, etc .., while the data engineering realm tend to focus on streamlining the process of delivery of models to production. Here is a great article by Assaf Pinhasi about the cultural gap in data science teams.

Traditional approach

The most common practice I have seen in different projects and organizations tend to be :

To run all the experiments needed for data explorations, model tuning, tools or platforms like Jupyter Notebook, Databricks notebooks and others are used by data scientists, once the model is trained, turned with the right parameters, and saved into artifact ( like a pickle file ), the code get committed to a git repository, and the work of data engineering begins.

from that point on, a data engineer needs to :

  • Build a data pipeline by creating the training and automation scripts ( train.py and predict.py)
  • Design a deployment strategy like a micro-services architecture with different services : inference, data preparation…
  • Build a CI/CD pipeline with the right ML-driven automation tests
  • Design model monitoring to capture concept drift
  • Size the right hardware needed to run the inference and the training

The boundaries of this collaboration between data science and data engineering often feel blurry, leaving room for a lot of “who is supposed to do what”, and in most cases requires data engineers to spend time understanding the steps followed.

The new approach

The idea behind this post is to showcase an example of streamlined ML deployment and training using combination of two ML frameworks: Kedro and cortex.dev with a minimum amount of code

What is Kedro ? Kedro is an open-source Python framework for creating reproducible, maintainable and modular data science code. It borrows concepts from software engineering and applies them to machine-learning code; applied concepts include modularity, separation of concerns and versioning.

What is Cortex? Cortex is an open source platform for large-scale inference workloads, it has the fololwing capabilities : Supports deploying TensorFlow, PyTorch, and other models as realtime or batch APIs.
Ensures high availability with availability zones and automated instance restarts.
Runs inference on on-demand instances or spot instances with on-demand backups.
Autoscales to handle production workloads with support for overprovisioning.

the combination of both tools allow us to implement a new approach that will :

  • Shift the data pipeline build to the data science side (kedro)
  • Parametrize model training by externalizing inputs like : split the train/test data, algorithms used, learning rates, epochs ( kedro )
  • Introduce the notion of nodes and pipelines and data catalogues (kedro)
  • Standardize outputs/inputs of nodes and persist results (kedro)
  • Guarantee scalability and repeatability : it’s easy to reuse nodes and pipelines for new data sources to create models specific to a similar business unit (kedro)
  • Design and build the inference infrastructure (cortex)
  • Flexibility to create batch or realtimeAPI endpoints ( cortex )
  • ease the management of dependencies ( cortex & kedro )

the new architecture would look something like this :

One of Kedro’s features is to break the ML steps into nodes and pipelines, generally there is a

Data Engineering pipeline : for data processing, feature extraction, normalization, encoding …

Data Science pipeline : Splitting the data to training and test sets, the designing the model(s), evaluation

Cortex on the other hand, takes care of creating and deploying the model generated by Kedro pipeline using a cortex operator, it create a kubernetes cluster in either AWS or GCP using a simple infrastructure description file :

region: us-east-1
instance_type: t2.medium
min_instances: 5
max_instances: 10
spot: true

it also creates a load balancer to distribute the inference across all the cluster nodes, and an API gateway to serve the API responses

Example : New York Taxi Trip Duration

I will be using a kaggle dataset that has the following data structure :

we aim to create a model that will predict the trip duration based on the other input features like pick up date, pick up location ( longitude latitude) , drop off location etc …

Considering that we have sizable amount of training data ( 1458645 rows ) and there is no sparsity in the data, or no dimension reduction needed I will use lightgbm for this proof of concept.

Creating Nodes and Pipelines

Nodes are the building blocks of pipelines and represent tasks. Pipelines are used to combine nodes to build workflows, which range from simple machine learning workflows to end-to-end production workflows.

in our case a node will represent tasks like :

  • feature extraction : hour of the day, day of the month, month of the year
  • Split data : train and test sets
  • train model : training using lightgbm
  • evaluate model : calculating metrics

Pipeline organises the dependencies and execution order of your collection of nodes, and connects inputs and outputs while keeping your code modular. The pipeline determines the node execution order by resolving dependencies and does not necessarily run the nodes in the order in which they are passed in.

To benefit from Kedro’s automatic dependency resolution, you can chain your nodes into a pipeline, which is a list of nodes that use a shared set of variables.

Here is the code to both pipelines , to run the project , you can change directory to ny_cab_trip_duration_kedro_training, and run the command :

ny_cab_trip_duration_kedro_training$ Kedro run
2021-01-19 19:35:43,309 - kedro.io.data_catalog - INFO - Loading data from `trips_train` (CSVDataSet)...
2021-01-19 19:35:46,316 - kedro.pipeline.node - INFO - Running node: extract_features: extract_features([trips_train]) -> [extract_features]
2021-01-19 19:36:24,834 - numexpr.utils - INFO - Note: NumExpr detected 12 cores but "NUMEXPR_MAX_THREADS" not set, so enforcing safe limit of 8.
2021-01-19 19:36:24,834 - numexpr.utils - INFO - NumExpr defaulting to 8 threads.
2021-01-19 19:39:27,635 - kedro.io.data_catalog - INFO - Saving data

This will run all nodes described above and generate the model file :

Model Deployment With Cortex

The amount of code needed for deployment is really minimal with cortex which makes automation and streamlining deployment extremely easy, in few steps we can have APIs deployed with latest version of the model , here are the steps :

  • Build a cloud deployment cluster
  • Deploy the model

Build cloud kubernetes cluster:

$cortex cluster  up -c  basic-cluster.yaml --aws-key AKIA3KH6IPR6WOSYNHVU --aws-secret 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

○ creating a new s3 bucket: cortex-6a2d11117c ✓
○ creating a new cloudwatch log group: cortex ✓
○ creating cloudwatch dashboard: cortex ✓
○ creating api gateway: cortex ✓
○ spinning up the cluster (this will take about 15 minutes) ...

At the end of the execution :

[✔]  EKS cluster "cortex" in "us-east-1" region is ready

○ updating cluster configuration ✓
○ configuring networking (this might take a few minutes) ✓
○ configuring autoscaling ✓
○ configuring logging ✓
○ configuring metrics ✓
○ starting operator ✓
○ waiting for load balancers ............................................................................ ✓
○ downloading docker images ✓

cortex is ready!

operator:          a1bcbfeb26cef442e92bbcd0daffbf42-01a64d913ebbf7b6.elb.us-east-1.amazonaws.com
api load balancer: a8e87f75709de4e96bbc3871b8ef9ceb-a6ec41dfe22e0c12.elb.us-east-1.amazonaws.com
api gateway:       https://g06o0hssmj.execute-api.us-east-1.amazonaws.com

Deploy Model

Note : I have copied the model created by Kedro pipeline to S3 under s3://cortex-6a2d11117c/tmp/

First we create a descriptive YAML of the model:

- name: trip-estimator
  kind: RealtimeAPI
  predictor:
    type: python
    path: predictor.py
    config:
      model: s3://cortex-6a2d11117c/tmp/
  monitoring:
    model_type: regression

Deploying it is as easy as :

cortex-trip-estimator$ cortex deploy trip_estimator.yaml
using aws environment

updating trip-estimator (RealtimeAPI)

cortex get                  (show api statuses)
cortex get trip-estimator   (show api info)
cortex logs trip-estimator  (stream api logs)

To make sure the API is deployed :

cortex get
env   realtime api     status   up-to-date   requested   last update   avg request   2XX
aws   trip-estimator   live     1            1           12m16s        -             -

Consuming the API

I created a small script to test API :

import requests
endpoint = "https://g06o0hssmj.execute-api.us-east-1.amazonaws.com/trip-estimator"
payload = {
    "input1":2.0,
    "input2":6.0,
    "input3":-73.96147155761719,
    "input5":40.774391174316406,
    "input6":-73.9537124633789,
    "input7":40.77536010742188,
    "input8":0.6621858468807331,
    "input9":0.007819358973817251,
    "input10":0.007788946222373263,
    "input11":6.0,
    "input12":23.0,
    "input13":33.0,
    "input14":0.0,
    "input15":3.0,
    "input16":0.0,
    "input17":0.0
}
prediction = requests.post(endpoint, payload)
trip_duration = str(prediction.content,'utf-8')
print("Trip duration is : ", trip_duration)

Run the script :

python consume.py

et voila !

cortex-trip-estimator$ python consume.py
Trip duration is :  5.252568735167378

Conclusion

With the growing number of platforms, tools, and frameworks that facilitate the deployment of machine learning, we will eventually get to a point where we have defined patterns, and standards.

The foundation of successful ML projects is having data scientists and data engineers speak the same language, by defining pipelines, tasks, inputs, outputs, at that point it becomes easy to streamline and automate the delivery.

Power generation prediction using realtime inference with Spark Structured Streaming and Kafka

A modern power plants generate an enormous amount of data, with the ever growing number of high frequency sensors, some of the applications of this data are : the protection against problems induced by combustion dynamics for example, and help improve the plant heat rate, and optimize power generation.

If we take a sensor that monitors the combustion process at a frequency of 25kHz it could generate up to 10Gig bytes of data a day, scale that to hundreds ( sometimes thousands ) and you will end up with something in the order of terabytes a day.

In the energy industry, independent power producers with big fleet of generation units are gathering massive quantities of data from their plants, whether it’s about reporting and dashboarding, or making recommendation in realtime to improve the generation, or simply alerting about a potential issue, realtime processing of streaming data is and should be an integral part of this data pipeline.

Kappa vs Lambda Architecture :

generally there are two approaches to realtime streaming, lambda and kappa architectures

lambda architecture :

Lambda architecture is a way of processing massive quantities of data (i.e. “Big Data”) that provides access to batch-processing and stream-processing methods with a hybrid approach. Lambda architecture is used to solve the problem of computing arbitrary functions. The lambda architecture itself is composed of 3 layers : Batch, Speed, and Serving.

batch layer manages historical data and handles all the computation and transformations on the data including machine learning inference

the speed layer is supposed to handle all low-latency and realtime queries and

Kappa Architecture :

The Kappa Architecture is used for processing streaming data. The main premise behind the Kappa Architecture is that you can perform both real-time and batch processing, especially for analytics, with a single technology stack. It is based on a streaming architecture in which an incoming series of data is first stored in a messaging engine like Apache Kafka. From there, a stream processing engine will read the data and transform it into an analyzable format, and then store it into an analytics database for end users to query.

Both architectures are also useful for addressing “human fault tolerance,” in which problems with the processing code (either bugs or just known limitations) can be overcome by updating the code and running it again on the historical data. The main difference with the Kappa Architecture is that all data is treated as if it were a stream, so the stream processing engine acts as the sole data transformation engine.

Example :

if you want to take a look at the code first, here are the links to the notebook and kafka producer code.

databricks notebook

Code : git repo

To to demonstrate realtime processing I will deploy a linear regression model that predicts the power generation in MWs based on sensors data.

I will be using the following Combined Cycle Power Plant Data Set , the dataset collects 9568 datapoints over 5 years ( 2006 to 2011) and these columns :

Features consist of hourly average ambient variables

  • Temperature (T) in the range 1.81°C and 37.11°C,
  • Ambient Pressure (AP) in the range 992.89-1033.30 milibar,
  • Relative Humidity (RH) in the range 25.56% to 100.16%
  • Exhaust Vacuum (V) in teh range 25.36-81.56 cm Hg
  • Net hourly electrical energy output (EP) 420.26-495.76 MW

I will use these frameworks :

Kafka ( confluent ) : as an upstream data source

Spark Structured Streaming as processing engine

Sparl.Ml to train the model

Data Exploration :

Histogram of all features and labels

Training :

After downloading the data, we select the features columns, we create an assembler Vector to gather all the features, then split the data to test and train, create Linear Regression – using featuresCOl and LabelCol

after the model is trained, we measure Model metrics ( MAE. RMSE, R2 )

%python
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

#After downloading the data, we select the features columns
feature_columns = df.columns[:-1]

# Create an assembler Vector
assembler = VectorAssembler(inputCols=feature_columns,outputCol="features")
df2 = assembler.transform(df)
df_train = df2.select("features","PE")

# Split the data to test and train
train, test = df_train.randomSplit([0.7, 0.3])

# Create Linear Regression - using featuresCOl and LabelCol
lr = LinearRegression(featuresCol="features", labelCol="PE")
model = lr.fit(train)

#Measure Model metrics
evaluation_summary = model.evaluate(test)

#evaluate
print(evaluation_summary.meanAbsoluteError)
print(evaluation_summary.rootMeanSquaredError)
print(evaluation_summary.r2)

Prediction on test set :

the transform method creates an extra column called prediction.

Predictions on streaming data

Power plants like combined cycle gas plants or coal or nuclear tend to generate an enormous amount of sensor data.

The Lambda architecture approach is to collect, process, clean the data and store it in a datalake waiting for the next batch to run the inference on it, the second approach (kappa architecture) is to run the model on the data while as it’s streaming. In this example we will use the following these main frameworks to run realtime predictions :

  • Kafka : a data streaming framework allowing producers to write the data upstream and consumer to read downstream
  • Spark SQL : Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as a distributed SQL query engine.
  • Spark Structured Streaming : scalable fault tolerant streaming processing engine built on top of SparkSQL provides the possibility to run transformation and ML models on streaming data

Here is the code to initiate a read stream from a kafka streaming cluster running on Host : plc-4nyp6.us-east-1.aws.confluent.cloud:9092

the ReadStream() method create a dynamic dataframe, that will hold values captured from kafka stream over time.

dfs = spark.readStream .format("kafka") .option("includeHeaders", "true") .option("kafka.bootstrap.servers", "plc-4nyp6.us-east-1.aws.confluent.cloud:9092") .option("subscribe", "ccgt_sensors") .option("startingOffsets", "latest") .option("kafka.security.protocol","SASL_SSL") .option("kafka.sasl.mechanism", "PLAIN") .option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"HXKG22IOPJ2RUDEJ\" password=\"zZLvDdDcBYFkolkaQJdvxrfyUuU/ju/egbgxD3+1GqH5g5mQ5ShhQ0PLh59sSv+xG\";") .load()
dfs2 = dfs.selectExpr("CAST(value AS STRING)")

this will read stream of input values as a string :

at this point we need to split the string to an array of strings based on the comma delimiter , and cast the list as Array<double>

Since model expect a vector of values to run the transformation ( prediction ) , I created a UDF (User Defined Function) to convert list of doubles to a vector using Vectors.dense() method

#cast string inouts as an array of doubles 
values = dfs2.select(
       split(dfs2.value, ",").cast("array<Double>").alias("inputs")
)

print(test.printSchema)
print(values.printSchema)
  
# create UDF to convert features list to   
conv_vec = udf(lambda vs: Vectors.dense([float(i) for i in vs]), VectorUDT())

now that we have stream input data as a vector we can run predictions :

def predict_mw(sdf):
  mw = model.transform(sdf) 
  return mw

df_out = predict_mw(new_df)

display(df_out)

the result is a realtime prediction of MWs generated by the power plant based on the input features

Deploy PJM Electricity Load Forecast Model on AWS SageMaker

in the energy industry, forecasting the grid load is vital for various commercial optimizations around Day Ahead and Real Time trading, but it also help Independent Power Providers (IPPs) allocate the right generation unites.

we talked previously about energy markets, CAISO, PJM, ERCOT , and others. In this article the goal is not to talk about the accuracy of the model in predicting load, but to highlight AWS SageMaker way of deploying ML models .

The Dataset :

You can use PJM tool dataminer to extract the load , dataminer is PJM’s enhanced data management tool, giving members and non-members easier, faster and more reliable access to public data formerly posted on pjm.com.

PJM dataMiner

the initial dataset will look something like :

to simplify this example we can delete the middle two columns to just keep datetime and load in MW

Splitting Training and Test Data

we will split data into training and test

Training Data : 6578

Test Data: 2021

Feature engineering

One transformation that will make on data is to create new features from datetime field :

  • hour of the day
  • day of the week
  • month
  • quarter
  • year
  • day of the month
  • day of the year
  • week of the year
# Create features from datetime index
def create_features(df, label=None):
    df['date'] = df.index
    df['hour'] = df['date'].dt.hour
    df['dayofweek'] = df['date'].dt.dayofweek
    df['month'] = df['date'].dt.month
    df['quarter'] = df['date'].dt.quarter
    df['year'] = df['date'].dt.year
    df['dayofyear'] = df['date'].dt.dayofyear
    df['dayofmonth'] = df['date'].dt.day
    df['weekofyear'] = df['date'].dt.weekofyear

Building Model with XGBoost

what is XGBoost :

XGBoost is an optimized distributed gradient boosting library designed to be highly efficient, flexible and portable. It implements machine learning algorithms under the Gradient Boosting framework. XGBoost provides a parallel tree boosting (also known as GBDT, GBM) that solve many data science problems in a fast and accurate way.

XGBoost algorithms are supervised machine learning that has a classifier and a regressor implementation

in this instance we will use xgboost regressor for predictions :

reg = xgb.XGBRegressor(n_estimators=1000)
reg.fit(X_train, y_train,
        eval_set=[(X_train, y_train), (X_test, y_test)],
        verbose = True)

Results of prediction

AWS Endpoint Deployment

to be able to deploy the model on AWS we must :

  • save the model in s3
  • create a container with model file
  • create a sagemaker Endpoint configuration
  • create sagemaker Endpoint

Saving Model to s3

region = 'us-east-1'
bucket = "pjm-load-forecast"
prefix = 'sagemaker/pjm-forecast-xgboost-byo'
bucket_path = 'https://s3-{}.amazonaws.com/{}'.format(region, bucket)

fObj = open("model.tar.gz", 'rb')
key= os.path.join(prefix, model_file_name, 'model.tar.gz')
boto3.Session().resource('s3').Bucket(bucket).Object(key).upload_fileobj(fObj)

Creating Model / Endpoint configuration / Endpoints

importing XGBoost image

import sagemaker
container = sagemaker.image_uris.retrieve("xgboost", boto3.Session().region_name, "1.2-1")

creating Model

primary_container = {
    'Image': container,
    'ModelDataUrl': model_url,
}
role = get_execution_role()
create_model_response2 = sm_client.create_model(
    ModelName = model_name,
    ExecutionRoleArn = role,
    PrimaryContainer = primary_container)

Once created the models are saved to AWS Sagemaker

Create endpoint configuration

endpoint_config_name = 'PJM-LoadForecast-XGBoostEndpointConfig-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print(endpoint_config_name)
create_endpoint_config_response = sm_client.create_endpoint_config(
    EndpointConfigName = endpoint_config_name,
    ProductionVariants=[{
        'InstanceType':'ml.t2.medium',
        'InitialInstanceCount':1,
        'InitialVariantWeight':1,
        'ModelName':model_name,
        'VariantName':'AllTraffic'}])

Same goes for Endpoint configuration, once created you can view them on AWS console

Deploy Endpoint

endpoint_name = 'PJM-LoadForecast-XGBoostEndpoint-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print(endpoint_name)
create_endpoint_response = sm_client.create_endpoint(
    EndpointName=endpoint_name,
    EndpointConfigName=endpoint_config_name)

Validation

SageMaker allow you to validate your endpoint , here a code snippet for that:

file_name = 'test_point.csv' 
with open(file_name, 'r') as f:
    payload = f.read().strip()
print('payload',payload)
response = runtime_client.invoke_endpoint(EndpointName=endpoint_name, 
                                   ContentType='text/csv', 
                                   Body=payload)
result = response['Body'].read().decode('ascii')

Notebook : you can find the notebook for this article on github

Conclusion

AWS Sagemaker has built-in algorithms for both supervised and unsupervised machine learning models. it provides a great platform for training, and deploying machine learning models into a production environment on AWS. By combining this powerful platform with the serverless capabilities of Amazon Simple Storage Service (S3), Amazon API Gateway, and AWS Lambda, it’s possible to transform an Amazon SageMaker endpoint into a web application that accepts new input data, potentially from a variety of sources, and presents the resulting inferences to an end user.

Predicting Electricity Wholesale Prices using AWS Machine Learning

Electricity Wholesale Markets

Most of the nation’s wholesale electricity sales happen in a competitive market managed by Independent System Operator (ISO), with over 200 million customers in these areas and over $120 billion in annual energy transactions taking place. Under the Federal Power Act, these markets are overseen by the Federal Energy Regulatory Commission (FERC), which ultimately determines the guidelines for how wholesale electricity is bought and sold in the marketplace. RTOs/ISOs create the market rules that enforce whether and how energy resources can compete. Wholesale markets should allow all resources to compete on price and performance, as the Federal Power Act requires that the rates, terms, and conditions of service governing wholesale competitive markets be “just and reasonable”.

wholesale_electric_power_markets_map

 

Predicting Energy Prices

In a competitive market, being able to predict energy prices does give IPPs a great advantage in formulating their bids for the day ahead market.

The DA is a purely financial market, which means even financial institutions who are not a power producer can hedge and make profit from buying/selling bulk energy in DA market.

Energy Price Forecasting ( EPF ) has become very instrumental in the decision making process for day to day energy bids, but also for creating a point of view (POV) for long term investments

There multiple models and methods used in creating price forecast for electricity :

  • Multi-agent model : build prices forecasting based on matching demand to supply, by simulation the operations of heterogeneous system of generation units and companies
  • Fundamentals model : which focuses on simulation the physical and economic relationship influencing the trading of electricity such as : weather, fleet conditions, load ..
  • Statistical model : uses mathematical regression models which is basically a combination of previous day/month/year prices and other input variables like weather or load
  • Computational intelegence model : this class of models uses mainly deep neural network, or support victor machine methods to predict prices , these methods are good at covering the non-linear aspect of a price curve, from a price spike point of view.
  • Hybrid model: which is a combination of two or more of the above models.

 

We will use the Computation intelligence model based on Fritz Arnold proposal for this explanatory exercise.

Proposal :

In this paper, the author explores different approaches to create an accurate prediction of energy wholesale prices, one of them which will make the subject of this article is to use purely time series data from previous years to learn about long (one year ) and short ( one day ) term patterns of prices

Data:

Historical hourly Day-Ahead market clearing prices for the German bidding zone.

Platform :

We will use Amazon SageMaker as our data science and machine learning platform :

Amazon SageMaker is a fully managed service that provides every developer and data scientist with the ability to build, train, and deploy machine learning (ML) models quickly. SageMaker removes the heavy lifting from each step of the machine learning process to make it easier to develop high quality models.

Traditional ML development is a complex, expensive, iterative process made even harder because there are no integrated tools for the entire machine learning workflow. You need to stitch together tools and workflows, which is time-consuming and error-prone. SageMaker solves this challenge by providing all of the components used for machine learning in a single toolset so models get to production faster with much less effort and at lower cost.

Platform Architecture :

vistra-databricks-Page-4

 

SageMaker Studio unifies at last all the tools needed for ML development. Developers can write code, track experiments, visualize data, and perform debugging and monitoring all within a single, integrated visual interface, which significantly boosts developer productivity.

Rhinestone-SageMaker-Studio-Page-2-v2

 

Timeseries analysis

loading the data: we read data input from csv file, which consists of historical prices

Screen Shot 2020-04-08 at 11.14.53 PM

Visualizing historical: 10 years od DA prices

Screen Shot 2020-04-08 at 11.25.06 PM

Model Architecture :

This model is hybrid and will use a combination of one convolutional neural network, with a kernel and a stride of 24 corresponding to the number of hours in the day, and a Long Short Term Memory deep neural network, to help the model learn about price pattern over long and short periods of time.

vistra-databricks-Page-5

 

Screen Shot 2020-04-08 at 11.51.01 PM

 

Predictions

For one year :

Screen Shot 2020-04-08 at 11.57.06 PM

For 2 weeks :

Screen Shot 2020-04-08 at 11.58.08 PM

Deploying the Model:

Once the model is trained and saved, we can use AWS endpoints to deploy as an API.

API backed deployment basically wraps the model in a web application to make available for inferences.

Conclusion

A EPF computational intelligence model that is based only on timeseries analysis does provide good results when it comes to learning the general patterns of prices, seasonality of surges, quarterly, and monthly patterns as well, weekday vs weekend, but it doesn’t perform well when it comes daily price surges which are critical to energy traders, as these impulses are not very frequent but when they happen they offer a great revenue opportunity for the Real Time market.

The rest of the project does provide a solution to help mitigate this issue, which consists of using MLP (MultiLayer perceptron ) to be able to feed the model different inputs like  weather, load, etc … and predict prices. The accurracy of the model improves when it comes to predicting price spikes, but as stated above, the computational intelligence models are not probabilistic in nature.

 

Graph databases and Energy: Modeling LMPs and CRRs

Graph Databases :

Graph databases are NoSQL databases which use the graph data model comprised of vertices, which is an entity such as a person, place, object or relevant piece of data and edges, which represent the relationship between two nodes.

Graph databases are particularly helpful because they highlight the links and relationships between relevant data similarly to how we do so ourselves.

this is an example of a graph :

Image result for example graph database

In the graph data model above we can see the common entities: User, Movie, Genre

the common relationships are: Rates, Follows, Has

Modeling Local Marginal Prices (LMPs) and Congestion Revenue Rights (CRR)  :

What are LMPs?

LMPs represents the cost to buy and sell power at different locations within wholesale electricity markets, usually called Independent System Operators (ISOs). Examples of ISOs include ERCOT, PJM, ISONE, MISO, CAISO, and NYISO. LMPs are made up of three components, Energy Price, Congestion Cost, and Losses. Most ISOs have Day Ahead and Real Time LMPs. Day-ahead LMPs represent prices in day-ahead markets which let market participants buy and sell wholesale electricity a day before the operating day to avoid volatility. Real-time LMPs represent prices in real time markets which let participants buy and sell power during the day of operation. As a simplified example, let’s say you lived in a neighborhood and at noon today you expected you would have 100 MWs of electricity demand. Yesterday you would have bought 100 MWs of electricity to be delivered at 12 today on the day-ahead market. However when 12 today rolls around, demand is actually 105 MWs, you would buy the additional 5 MWs on the real-time market. Real-time market prices are generally more volatile than day-ahead market prices.

Example

Without congestion

This is a simplified scenario where we have 2 generators and one consumer, first generators make a 10MW for the price of $4 per MWh, the second makes 20 MW for the price of $2 per MWh, the lines able to transmit 10MW to consumer premises.

no congestion exists and Generator 2 has the lowest offer and can serve the entire load. Since LMP is calculated as the cost of the next MW needed and generator 2 can supply the next MW at $2, the LMP is $2/MWh. Because there’s no congestion on the network, the cheaper LMP of $2/MWh applies to all the nodes, and the 10 MW load costs $20.

In this case, the LMP will be $2/MWh and the payments/charges would look something like this :

As we can see generator 2 was able to provide the entirety of the power needed, and LMP is $2 in all three buses ( without accounting for line losses ).

With Congestion:

In the next scenario we will introduce the concept of congestion:

The power line from Generator2 has a limit of 4 MW, meaning that maximum power that generator 2 can put on the grid is 4MW, the remaining ( 6MW ) that the consumer needs will be provided by Generator1, this will influence the LMPs in all three buses

congestion exists and Generator 2 is not fully utilized; there’s a constraint from Generator 2 and it can only serve 4 MWs. Now using MWs from the lowest priced generator cannot serve the entire load; 6 MWs must come from elsewhere, such as Generator 1.
The LMP at Generator 2 is still $2/MWh, so for the 4MWs it supplies, $8 will be paid to Generator 2. Generator 1 will supply 6 MW at $4/MWh, so it should receive a payment of $24. Because congestion exists between Generator 2 and the load, the two LMPs are different, and the more expensive generation cost of $4/MWh is used when charging the load for the 10MW; $40 is owed for the 10 MWs it used for an hour.

In this the following example I have tried to model both scenarios we showed in the example above.

There is 3 type of nodes:

Generator: a generation resource in the grid, it can be a power plant, wind or solar farm.

Consumer: a consumer entity ( residential, commercial …)

Connection: represent a connection between power lines on the grid

The relationships basically represent the capacity of the transmission line, in this case, 4MW and 10MW

Here is a link to the java code on GitHub

Screen Shot 2019-04-26 at 11.03.01 AM

We are using Spring Boot as an application framework, the nodes from the previous example are used as classes.

to compile the application and generate the binaries :

Screen Shot 2019-04-26 at 11.06.42 AM

Starting the application :

Screen Shot 2019-04-26 at 11.09.28 AM

in this example we have a Neo4j server running locally if you go to http://localhost:7474

you will be able to see the graph :

Screen Shot 2019-04-26 at 11.13.55 AM

in the graph, we can see all the nodes and relationships that was generated :

Generators, Connections, and Customers

if we look at the node properties :

Screen Shot 2019-04-26 at 11.17.31 AM

 

 

Using Smart Meter Data for consumer electricity usage forecasting

Electric energy consumption is essential for promoting economic development and raising the standard of living. In contrast to other energy sources, electric energy cannot be stored for large-scale consumption. From an economic viewpoint, the supply and demand of electric energy must be balanced at any given time. Therefore, a precise forecasting of electric energy consumption is very important for the economic operation of an electric power grid.

The ability to create a forecasting model for an individual consumer can help determine the overall load on the grid for a given time.

For this post, we will classify this as a time series prediction problem and only use one variance.  Check back for a post where introduce mutli variance.

The Dataset :

I used Smart Meters Texas or SMT portal to access my home electricity usage. SMTstores daily, monthly and even 15-minute intervals of energy data. Energy data is  recorded by digital electric meters (commonly known as smart meters), and provides secure access to the data for customers and authorized market participants

In addition to acting as an interface for access to smart meter data, SMT enables secure communications with the customers in-home devices and provides a convenient and easy-to-use process where customers can voluntarily authorize market participants, other than the customer’s retail electric provider or third parties, access to their energy information and in-home devices.

Tools :

Keras: Keras is an open source neural network library written in Python. It is capable of running on top of TensorFlow, Microsoft Cognitive Toolkit, Theano, or PlaidML. Designed to enable fast experimentation with deep neural networks, it focuses on being user-friendly, modular, and extensible

TensorFlow : TensorFlow is an open-source software library for dataflow programming across a range of tasks. It is a symbolic math library and is also used for machine learning applications such as neural networks

The Algorithm: LSTM

In our case, we will be using a variant of Recurrent Neural Network ( RNN ) called Long Short Term Memory ( LSTM), why? time series problems are a difficult type of prediction modeling, LSTMs are good at extracting input features patterns when it’s over a long period of time, LSTM have the capability of retaining it in memory, and use it to predict next sequences, this what my 3 months usage (in KWH) look like ( by 15 min interval ) :

Screen Shot 2019-02-10 at 2.31.38 PM

for this example, I will only use a subset of the overall dataset, 3 days of electricity usage:

[code]

DATE,USAGE_KWH
11/01/18 00:15,0.005
11/01/18 00:30,0.005
11/01/18 00:45,0.013
11/01/18 01:00,0.029
11/01/18 01:15,0.025
11/01/18 01:30,0.004
11/01/18 01:45,0.005
11/01/18 02:00,0.004
11/01/18 02:15,0.024

[/code]

Screen Shot 2019-02-10 at 2.44.35 PM

python :

loading the dataset:

[code]

# load the dataset

dataframe = read_csv(‘AMS.csv’, usecols=[1], engine=’python’)

dataset = dataframe.values

dataset = dataset.astype(‘float32’)

[/code]

Training Set and Test Set :

[code]

train_size = int(len(dataset) * 0.67)

test_size = len(dataset) – train_size

train, test = dataset[0:train_size,:], dataset[train_size:len(dataset),:]

[/code]

creating model :

[code]

model = Sequential()

model.add(LSTM(4, input_shape=(1, look_back))) model.add(Dense(1)) model.add(Activation(‘tanh’)) model.compile(loss=’mean_squared_error’, optimizer=’adam’) model.fit(trainX, trainY, epochs=50, batch_size=1) [/code]

making predictions:

[code]

trainPredict = model.predict(trainX)

testPredict = model.predict(testX) [/code]

for plotting we shift the training and test sets :

[code]
# shift train predictions for plotting
trainPredictPlot = numpy.empty_like(dataset)
trainPredictPlot[:, :] = numpy.nan
trainPredictPlot[look_back:len(trainPredict)+look_back, :] = trainPredict
# shift test predictions for plotting
testPredictPlot = numpy.empty_like(dataset)
testPredictPlot[:, :] = numpy.nan
testPredictPlot[len(trainPredict)+(look_back*2)+1:len(dataset)-1, :] = testPredict
[/code]

if results were plotted, it would look something like the picture below:

Blue: actual usage

Orange: Test Set

Green: Predictions

Screen Shot 2019-02-11 at 8.27.33 AM

if we zoom on the prediction part :

Screen Shot 2019-02-11 at 8.34.54 AM

this is good forecasting, with a 0.19 RMSE

Summary :

With the electricity market undergoing a revolution, load forecasts have gained much more significance spreading across other business departments like energy trading and financial planning. Accurate load forecasts are the basis for most reliability organizations operations, like Electricity Reliability Consul Of Texas more commonly known as ERCOT. Accurate load forecasting will become even more important with Smart Grids. Smart Grids create the opportunity to proactively take action at the consumer level, storage level, and generation side, to avoid a situation of energy scarcity and/ or price surge.