Markov Decision Processes Using Custom PySpark

Manostaxx
Manostaxx

 

http://www.manostaxx.com

 

 

Policies that lead to optimal outcomes for an organization are some of the most difficult challenges facing decision makers within an organization. The reason for it is the fact that policies are not made in a world with perfect information and markets in equilibrium. These are complex systems, where the behavior of entities within that system are dynamic and generally uncertain.

Recently a field of machine learning called Reinforcement Learning (RL) has gained popularity for its utility in modeling complex behavior to identify optimal strategy. RL maps states or situations to actions in order to maximize some result or reward.

The Markov Decision Process (MDP) is a core component of the RL methodology. The Markov property assumes the future depends on the present and not on the past. The Markov chain is a probabilistic model that uses the current state to predict the next state. For example, if today is cloudy, there is a high probability that tomorrow could be rainy.

In most instances of MDP, entities are referred to as “agents,” who operate within a dynamic environment to find an optimal behavior solution to a particular challenge via trial and error. The behavior of each agent can be adjusted, based on the testing of a policy.

The behavior is often a transition done in a sequential manner, where a decision is made, an action is performed, a reward is received, and a new decision is made. The outcome of each decision is not fully predictable but can be anticipated to some extent before the next decision is made through the transition probability distribution.

The interesting thing about RL is not about the large amounts of data needed to create and tune the model (although having it would improve the model) but the amount of data these models generate. Modeling complex systems are simulations. These simulations generate data and, when analyzed, lead to insights that identify inefficiencies in policy that can be improved.

This blog post uses PySpark to scale an MDP example problem. When simulating complex systems, it can be very challenging to scale to large numbers of agents, due to the amount of processing that needs to be performed in memory as each agent goes through a permutation. PySpark allows us to leverage Spark for the distributed data processing and Python to define the states and actions of the agents.

Example

An organization has equipment that is evaluated at the end of every week. It can be one of three conditions at evaluation time: Good, Acceptable, Bad. Historical maintenance data indicates that, depending on the condition of the equipment in the previous week, there is a probability that the condition of the equipment could change the next week or remain in its current condition. In this example, the condition of the equipment is the state.

These are the agent transition states:

  1. If the equipment is Good the previous week, then there is a 60% chance it will be Good the next week, 30% chance it will be Acceptable, and 10% chance it will be Bad.
  2. If the equipment is Acceptable the previous week, then there is 0% chance it will be Good, 60% chance it will be Acceptable, and 40% chance it will be Bad the following week.
  3. If the equipment is Bad the previous week, then there is a 100% chance it will be Bad the next.

The actions to address these states are:

  1. Perform no maintenance or replacement.
  2. Perform maintenance where it would improve Bad equipment to Acceptable and Acceptable to Good. The cost to perform maintenance is $500.
  3. Replace the equipment, which immediately makes it Good condition. The cost of replacing is $2000.

The rewards are associated with each piece of equipment. When the equipment is in:

  • Good condition – it is worth $1000
  • Acceptable condition – it is worth $500
  • Bad condition – it loses $500

Management is planning for an operating budget and expenses for the next two years. It needs to determine the best policy to maximize the marginal gains afforded by each productive piece of equipment and minimize the losses associated with defective equipment.

The policies under consideration are:

  1. Replace the equipment when it is in Bad condition only.
  2. Replace the equipment when it is in Bad condition and perform maintenance when it is in Acceptable condition.
  3. Replace the equipment when it is in Bad condition and Acceptable condition.

There are three general methods of solving MDP problems:

  1. Value Iteration Method – for finite horizon problems
  2. Policy Iteration – for infinite horizon problems
  3. Linear Programming – for infinite horizon problems

Here I will be using the Value Iteration Method.

We have a set of defined states and outcomes. It does require a system of simultaneous equations. At each iteration, the action and transition can be performed quickly and simply.

I am using the MapR Data Science Refinery (DSR) that has a Zeppelin notebook already installed and configured to work with Spark 2.1.0 and Python 2.7.5. The code will also work in the MapR Sandbox within the PySpark shell. The code for this post is on my GitHub: https://github.com/JustinBurg.

I created an Agent class in a separate Python script that will be saved on the MapR File System (MapR-FS) that can be accessed by all Spark workers. At the command line:

$ vi agent.py

Copy and paste this in:

class Agent:

    def \_\_init\_\_(self, row):

        self.id = row\[0\]

Save this into MapR-FS in the tmp/ directory, and check to make sure it is there:

$ hadoop fs -put agent.py tmp/agent.py

$ hadoop fs -ls tmp/

Okay, now we can start PySpark. If you are using the PySpark shell in the MapR Sandbox, use the following:

$ /opt/mapr/spark/spark-2.1.0/bin/pyspark

Load dependencies:

from pyspark import SparkContext

from pyspark.conf import SparkConf

import numpy as np

from pyspark.ml.linalg import Vectors

from pyspark.mllib.random import RandomRDDs

from pyspark.sql.types import *

from pyspark.sql import DataFrame

from pyspark import SparkConf, SparkContext

import pyspark.sql.functions as func

from pyspark.sql.functions import *

from pyspark.sql.types import StringType, IntegerType, StructType, StructField, DoubleType, FloatType, DateType, TimestampType

from pyspark.sql.functions import date\_format, col, desc, udf, from\_unixtime, unix\_timestamp, date\_sub, date\_add, last\_day

from pyspark.sql.functions import round, sum, lit, add_months, coalesce, max

from pyspark.sql.functions import rand, randn

from collections import OrderedDict

from pyspark.sql import Row

import random

from datetime import datetime

import time

In the Zeppelin notebook, I am using the PySpark kernel to add our agent.py script and import our Agent class. If you are using the sandbox, the path will be something like:

sc.addPyFile("maprfs:///user/user01/tmp/agent.py")

from agent import Agent

Create an PySpark rdd that will be the data object we apply the transition and policy action functions on:

num_agents = 10000

agentRDD = sqlContext.createDataFrame(zip(range(1, num_agents + 1)), \["id"\]).rdd

agentRDD.take(10)

Verify the type of rdd is ‘pyspark.rdd.RDD’:

type(agentRDD)

Create Agent function:

def create_agent(row):

    agent = Agent(row)

    agent.current_state = 'good'

    agent.active = True

    agent.total_benefit = 1000

    agent.total_cost = 0

    agent.maintenance_performed = 'N'

    agent.number\_maintenance\_performed = 0

    agent.replacement = 'N'

    agent.number_replacements = 0

    agent.policy = 0

    return agent

Here we initialize our equipment agents. Their current state is ‘good,’ and their initial benefit is $1000 as they are new. We create variables for capturing cost, benefit, maintenance, replacement, and the policy applied to the equipment.

Next, we create our transition and reward function. This will transition each piece of equipment into a new state at every time step. A reward or penalty will be accessed based on the new state.

def update_status(row):

    agent = row

    p = random.uniform(0,1)

    if agent.current_state == 'good' and agent.replacement == 'N':    

        if p < 0.9 and p > .6:

            agent.current_state = 'acceptable'

            agent.total_benefit += 500

        elif p > .9:

            agent.current_state = 'bad'

            agent.total_cost += 500

        else:

            agent.current_state = 'good'

            agent.total_benefit += 1000

    elif agent.current_state == 'good' and agent.replacement == 'Y':

        agent.current_state = 'good'

        agent.total_benefit += 1000

        agent.replacement = 'N'

    elif agent.current\_state == 'good' and  agent.maintenance\_performed == 'Y':

        agent.current_state = 'good'

        agent.total_benefit += 1000

        agent.maintenance_performed = 'N'

    elif agent.current_state == 'acceptable':

        if p > 0.6:

            agent.current_state = 'bad'

            agent.total_cost += 500

        else:

            agent.current_state = 'acceptable'

            agent.total_benefit += 500

    else:

        agent.current_state = 'bad'

        agent.total_cost += 500

    return agent

Create our policy functions:

def policy_one(row):

    agent = row

    if agent.current_state == 'bad':    

        agent.replacement = 'Y'

        agent.number_replacements += 1

        agent.total_cost += 2000         #replacement cost          

        agent.current_state = 'good'

    return agent

def policy_two(row):

    agent = row

    if agent.current_state == 'acceptable':

        agent.mainteance_performed = 'Y'

        agent.number\_maintenance\_performed += 1

        agent.total_cost += 500          #cost of fixing equipment

        agent.current_state = 'good'

    if agent.current_state == 'bad':

        agent.replacement = 'Y'

        agent.number_replacements += 1

        agent.total_cost += 2000        #replacement cost

        agent.current_state = 'good'

    return agent

def policy_three(row):

    agent = row

    if agent.current\_state == 'acceptable' or agent.current\_state == 'bad':    

        agent.replacement = 'Y'

        agent.number_replacements += 1

        agent.total_cost += 2000              #replacement cost

        agent.current_state = 'good'

    return agent

Now we have captured the states, transitions, actions, and rewards that will be applied to the equipment. We need to create the function that will apply these components in a simulation.

def mdp_simulation(row,time,policy):

    results = \[\]

    agent = create_agent(row)

    for week in range(time):

        if week < 1:

            results.append((week,

                            agent.id,

                           agent.current_state,

                           agent.total_benefit,

                            agent.total_cost,

                            agent.maintenance_performed,

                           agent.number\_maintenance\_performed,

                            agent.replacement,

                           agent.number_replacements))

        else:

            update_status(agent)

            results.append((week,

                            agent.id,

                           agent.current_state,

                           agent.total_benefit,

                            agent.total_cost,

                           agent.maintenance_performed,

                           agent.number\_maintenance\_performed,

                            agent.replacement,

                           agent.number_replacements))

            policy(agent)

    return results

This is the function that will directly be applied to the agentRDD we created. It calls the create_agent function to initialize our agents. Over the course of a specified time frame, the equipment will transition, that transition will be recorded in a list called results, and then the policy that is being evaluated will be applied on the equipment at that time step.The final output will be a list of lists that we will convert into a PySpark dataframe to analyze.

To see what the create_agent() function generates when applied by itself, let’s apply Policy One on the agentRDD:

cols = ("Week", "id", "current\_state", "total\_benefit", "total_cost",

 "maintenance\_performed", "number\_maintenance_performed",

"replacement","number_replacements")

t = 100

policy\_One\_results = agentRDD.flatMap(lambda a:

mdp\_simulation(a,t,policy\_one)).toDF().orderBy('\_1','\_2').toDF(*cols)

policy\_One\_results.show(100,False)

policy\_One\_summary = policy\_One\_results.select(\[func.sum(policy\_One\_results.total\_benefit).alias("Total\_Gains"),  func.sum(policy\_One\_results.total\_cost).alias("Total\_Cost"),  max(policy\_One\_results.number\_replacements).alias("Max\_Number\_of\_Replacements")\])

policy\_One\_summary.show()

+-----------+-----------+-------------------------------------------------+

|Total\_Gains| Total\_Cost|Max\_Number\_of_Replacements|

+-----------+-----------+-------------------------------------------------+

|32784435000|22446961500| 26 |

+-----------+-----------+-------------------------------------------------+

Now run Policy Two and Policy Three:

policy\_Two\_results = agentRDD.flatMap(lambda a: mdp\_simulation(a,t,policy\_two)).toDF().orderBy('\_1','\_2').toDF(*cols)

policy\_Three\_results = agentRDD.flatMap(lambda a: mdp\_simulation(a,t,policy\_three)).toDF().orderBy('\_1','\_2').toDF(*cols)

policy\_Two\_summary = policy\_Two\_results.select(\[ func.sum(policy\_Two\_results.total\_benefit).alias("Total\_Gains"), func.sum(policy\_Two\_results.total\_cost).alias("Total\_Cost"), max(policy\_Two\_results.number\_replacements).alias("Max\_Number\_of\_Replacements")\])

policy\_Two\_summary.show()

+-----------+-----------+------------------------------------------------+

|Total\_Gains| Total\_Cost|Max\_Number\_of_Replacements|

+-----------+-----------+------------------------------------------------+

|39232216500|17712767500| 22|

+-----------+-----------+------------------------------------------------+

policy\_Three\_summary = policy\_Three\_results.select(\[func.sum(policy\_Three\_results.total\_benefit).alias("Total\_Gains"),func.sum(policy\_Three\_results.total\_cost).alias("Total\_Cost"),max(policy\_Three\_results.number\_replacements).alias("Max\_Number\_of\_Replacements")\])

policy\_Three\_summary.show()

+-----------+-----------+------------------------------------------------+

|Total\_Gains| Total\_Cost|Max\_Number\_of_Replacements|

+-----------+-----------+------------------------------------------------+

|41620343500|29656050000| 39|

+-----------+-----------+-----------------------------_-----------------+

Comparing our output:

Policy Two would give us the highest gains with the lowest costs and the fewest repairs per machine, while Policy Three would give us the highest gains overall.

This example does not take into account discounting, which in many economic systems is important to take into consideration, given the future cost of money. This is due to the fact that the value of $1 now is not the same as its value in 3 years’ time. This model also assumes that all pieces of equipment are the same, which is often not the case.

This post hopefully showed the capability of PySpark as a tool for MDP problems that deal with multiple transitions with a large population of agents and introduced you to the capability of using custom functions to simulate complex, sequential decision problems with uncertainty.

 

Continue at: https://mapr.com/blog/markov-decision-processes-using-custom-pyspark-udfs/

The text above is owned by the site above referred.

Here is only a small part of the article, for more please follow the link

Also see:

https://dadoswebaxx.weebly.com/

 

DadosWebaxx
DadosWebAxx

Leave a Reply

Your email address will not be published. Required fields are marked *