{ "cells": [ { "cell_type": "markdown", "id": "9d41c333", "metadata": {}, "source": [ "# RHC Example: Double integrator with bounded input\n", "\n", "Richard M. Murray, 3 Feb 2022 (updated 29 Jan 2023)\n", "\n", "To illustrate the implementation of a receding horizon controller, we\n", "consider a linear system corresponding to a double integrator with\n", "bounded input:\n", "\n", "$$\n", " \\dot x = \\begin{bmatrix} 0 & 1 \\\\ 0 & 0 \\end{bmatrix} x + \\begin{bmatrix} 0 \\\\ 1 \\end{bmatrix} \\text{clip}(u)\n", " \\qquad\\text{where}\\qquad\n", " \\text{clip}(u) = \\begin{cases}\n", " -1 & u < -1, \\\\\n", " u & -1 \\leq u \\leq 1, \\\\\n", " 1 & u > 1.\n", " \\end{cases}\n", "$$\n", "\n", "We implement a model predictive controller by choosing\n", "\n", "$$\n", " Q_x = \\begin{bmatrix} 1 & 0 \\\\ 0 & 0 \\end{bmatrix}, \\qquad\n", " Q_u = \\begin{bmatrix} 1 \\end{bmatrix}, \\qquad\n", " P_1 = \\begin{bmatrix} 0.1 & 0 \\\\ 0 & 0.1 \\end{bmatrix}.\n", "$$" ] }, { "cell_type": "code", "execution_count": null, "id": "4fe0af7f", "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "import scipy as sp\n", "import matplotlib.pyplot as plt\n", "import control as ct\n", "import control.optimal as opt\n", "import control.flatsys as fs\n", "import time" ] }, { "cell_type": "markdown", "id": "4c695f81", "metadata": {}, "source": [ "## System definition\n", "\n", "The system is defined as a double integrator with bounded input." ] }, { "cell_type": "code", "execution_count": null, "id": "5c01f571", "metadata": {}, "outputs": [], "source": [ "def doubleint_update(t, x, u, params):\n", " # Get the parameters\n", " lb = params.get('lb', -1)\n", " ub = params.get('ub', 1)\n", " assert lb < ub\n", "\n", " # bound the input\n", " u_clip = np.clip(u, lb, ub)\n", "\n", " return np.array([x[1], u_clip[0]])\n", "\n", "proc = ct.NonlinearIOSystem(\n", " doubleint_update, None, name=\"double integrator\",\n", " inputs = ['u'], outputs=['x[0]', 'x[1]'], states=2)" ] }, { "cell_type": "markdown", "id": "6c2f0d00", "metadata": {}, "source": [ "## Receding horizon controller\n", "\n", "To define a receding horizon controller, we create an optimal control problem (using the OptimalControlProblem class) and then use the compute_trajectory method to solve for the trajectory from the current state.\n", "\n", "We start by defining the cost functions, which consists of a trajectory cost and a terminal cost:" ] }, { "cell_type": "code", "execution_count": null, "id": "a501efef", "metadata": {}, "outputs": [], "source": [ "Qx = np.diag([1, 0]) # state cost\n", "Qu = np.diag([1]) # input cost\n", "traj_cost=opt.quadratic_cost(proc, Qx, Qu)\n", "\n", "P1 = np.diag([0.1, 0.1]) # terminal cost\n", "term_cost = opt.quadratic_cost(proc, P1, None)" ] }, { "cell_type": "markdown", "id": "c5470629", "metadata": {}, "source": [ "We also set up a set of constraints the correspond to the fact that the input should have magnitude 1. This can be done using either the [input_range_constraint](https://python-control.readthedocs.io/en/0.9.3.post2/generated/control.optimal.input_range_constraint.html) function or the [input_poly_constraint](https://python-control.readthedocs.io/en/0.9.3.post2/generated/control.optimal.input_poly_constraint.html) function." ] }, { "cell_type": "code", "execution_count": null, "id": "cb4c511a", "metadata": {}, "outputs": [], "source": [ "traj_constraints = opt.input_range_constraint(proc, -1, 1)\n", "# traj_constraints = opt.input_poly_constraint(\n", "# proc, np.array([[1], [-1]]), np.array([1, 1]))" ] }, { "cell_type": "markdown", "id": "a5568374", "metadata": {}, "source": [ "We define the horizon for evaluating finite-time, optimal control by setting up a set of time points across the designed horizon. The input will be computed at each time point." ] }, { "cell_type": "code", "execution_count": null, "id": "9edec673", "metadata": {}, "outputs": [], "source": [ "Th = 5\n", "timepts = np.linspace(0, Th, 11, endpoint=True)\n", "print(timepts)" ] }, { "cell_type": "markdown", "id": "cb8fcecc", "metadata": {}, "source": [ "Finally, we define the optimal control problem that we want to solve (without actually solving it)." ] }, { "cell_type": "code", "execution_count": null, "id": "e9f31be6", "metadata": {}, "outputs": [], "source": [ "# Set up the optimal control problem\n", "ocp = opt.OptimalControlProblem(\n", " proc, timepts, traj_cost,\n", " terminal_cost=term_cost,\n", " trajectory_constraints=traj_constraints,\n", " # terminal_constraints=term_constraints,\n", ")" ] }, { "cell_type": "markdown", "id": "ee9a39dd", "metadata": {}, "source": [ "To make sure that the problem is properly defined, we solve the problem for a specific initial condition. We also compare the amount of time required to solve the problem from a \"cold start\" (no initial guess) versus a \"warm start\" (use the previous solution, shifted forward on point in time)." ] }, { "cell_type": "code", "execution_count": null, "id": "887295eb", "metadata": {}, "outputs": [], "source": [ "X0 = np.array([1, 1])\n", "\n", "start_time = time.process_time()\n", "res = ocp.compute_trajectory(X0, initial_guess=0, return_states=True)\n", "stop_time = time.process_time()\n", "print(f'* Cold start: {stop_time-start_time:.3} sec')\n", "\n", "# Resolve using previous solution (shifted forward) as initial guess to compare timing\n", "start_time = time.process_time()\n", "u = res.inputs\n", "u_shift = np.hstack([u[:, 1:], u[:, -1:]])\n", "ocp.compute_trajectory(X0, initial_guess=u_shift, print_summary=False)\n", "stop_time = time.process_time()\n", "print(f'* Warm start: {stop_time-start_time:.3} sec')" ] }, { "cell_type": "markdown", "id": "115dec26", "metadata": {}, "source": [ "(In this case the timing is not that different since the system is very simple.)\n", "\n", "Plotting the result, we see that the solution is properly computed." ] }, { "cell_type": "code", "execution_count": null, "id": "4b98e773", "metadata": {}, "outputs": [], "source": [ "plt.plot(res.time, res.states[0], 'k-', label='$x_1$')\n", "plt.plot(res.time, res.inputs[0], 'b-', label='u')\n", "plt.xlabel('Time [s]')\n", "plt.ylabel('$x_1$, $u$')\n", "plt.legend();" ] }, { "cell_type": "markdown", "id": "0e85981a", "metadata": {}, "source": [ "We implement the receding horicon controller using a function that we can with different versions of the problem." ] }, { "cell_type": "code", "execution_count": null, "id": "eb2e8126", "metadata": {}, "outputs": [], "source": [ "# Create a figure to use for plotting\n", "def run_rhc_and_plot(\n", " proc, ocp, X0, Tf, print_summary=False, verbose=False, ax=None, plot=True): \n", " # Start at the initial point\n", " x = X0\n", " \n", " # Initialize the axes\n", " if plot and ax is None:\n", " ax = plt.axes()\n", " \n", " # Initialize arrays to store the final trajectory\n", " time_, inputs_, outputs_, states_ = [], [], [], []\n", " \n", " # Generate the individual traces for the receding horizon control\n", " for t in ocp.timepts:\n", " # Compute the optimal trajectory over the horizon\n", " start_time = time.process_time()\n", " res = ocp.compute_trajectory(x, print_summary=print_summary)\n", " if verbose:\n", " print(f\"{t=}: comp time = {time.process_time() - start_time:0.3}\")\n", "\n", " # Simulate the system for the update time, with higher res for plotting\n", " tvec = np.linspace(0, res.time[1], 20)\n", " inputs = res.inputs[:, 0] + np.outer(\n", " (res.inputs[:, 1] - res.inputs[:, 0]) / (tvec[-1] - tvec[0]), tvec)\n", " soln = ct.input_output_response(proc, tvec, inputs, x)\n", " \n", " # Save this segment for later use (final point will appear in next segment)\n", " time_.append(t + soln.time[:-1])\n", " inputs_.append(soln.inputs[:, :-1])\n", " outputs_.append(soln.outputs[:, :-1])\n", " states_.append(soln.states[:, :-1])\n", "\n", " if plot:\n", " # Plot the results over the full horizon\n", " h3, = ax.plot(t + res.time, res.states[0], 'k--', linewidth=0.5)\n", " ax.plot(t + res.time, res.inputs[0], 'b--', linewidth=0.5)\n", "\n", " # Plot the results for this time segment\n", " h1, = ax.plot(t + soln.time, soln.states[0], 'k-')\n", " h2, = ax.plot(t + soln.time, soln.inputs[0], 'b-')\n", " \n", " # Update the state to use for the next time point\n", " x = soln.states[:, -1]\n", " \n", " # Append the final point to the response\n", " time_.append(t + soln.time[-1:])\n", " inputs_.append(soln.inputs[:, -1:])\n", " outputs_.append(soln.outputs[:, -1:])\n", " states_.append(soln.states[:, -1:])\n", "\n", " # Label the plot\n", " if plot:\n", " # Adjust the limits for consistency\n", " ax.set_ylim([-4, 3.5])\n", "\n", " # Add reference line for input lower bound\n", " ax.plot([0, 7], [-1, -1], 'k--', linewidth=0.666)\n", "\n", " # Label the results\n", " ax.set_xlabel(\"Time $t$ [sec]\")\n", " ax.set_ylabel(\"State $x_1$, input $u$\")\n", " ax.legend(\n", " [h1, h2, h3], ['$x_1$', '$u$', 'prediction'],\n", " loc='lower right', labelspacing=0)\n", " plt.tight_layout()\n", " \n", " # Append\n", " return ct.TimeResponseData(\n", " np.hstack(time_), np.hstack(outputs_), np.hstack(states_), np.hstack(inputs_))" ] }, { "cell_type": "markdown", "id": "be13e00a", "metadata": {}, "source": [ "Finally, we call the controller and plot the response. The solid lines show the portions of the trajectory that we follow. The dashed lines are the trajectory over the full horizon, but which are not followed since we update the computation at each time step. (To get rid of the statistics of each optimization call, use print_summary=False.)" ] }, { "cell_type": "code", "execution_count": null, "id": "305a1127", "metadata": {}, "outputs": [], "source": [ "Tf = 10\n", "rhc_resp = run_rhc_and_plot(proc, ocp, X0, Tf, verbose=True, print_summary=False)\n", "print(f\"xf = {rhc_resp.states[:, -1]}\")" ] }, { "cell_type": "markdown", "id": "6005bfb3", "metadata": {}, "source": [ "## RHC vs LQR vs LQR terminal cost\n", "\n", "In the example above, we used a receding horizon controller with the terminal cost as $P_1 = \\text{diag}(0.1, 0.1)$. An alternative is to set the terminal cost to be the LQR terminal cost that goes along with the trajectory cost, which then provides a \"cost to go\" that matches the LQR \"cost to go\" (but keeping in mind that the LQR controller does not necessarily respect the constraints).\n", "\n", "The following code compares the original RHC formulation with a receding horizon controller using an LQR terminal cost versus an LQR controller." ] }, { "cell_type": "code", "execution_count": null, "id": "ea2de1f3", "metadata": {}, "outputs": [], "source": [ "# Get the LQR solution\n", "K, P_lqr, E = ct.lqr(proc.linearize(0, 0), Qx, Qu)\n", "print(f\"P_lqr = \\n{P_lqr}\")\n", "\n", "# Create an LQR controller (and run it)\n", "lqr_ctrl, lqr_clsys = ct.create_statefbk_iosystem(proc, K)\n", "lqr_resp = ct.input_output_response(lqr_clsys, rhc_resp.time, 0, X0)\n", "\n", "# Create a new optimal control problem using the LQR terminal cost\n", "# (need use more refined time grid as well, to approximate LQR rate)\n", "lqr_timepts = np.linspace(0, Th, 25, endpoint=True)\n", "lqr_term_cost=opt.quadratic_cost(proc, P_lqr, None)\n", "ocp_lqr = opt.OptimalControlProblem(\n", " proc, lqr_timepts, traj_cost, terminal_cost=lqr_term_cost,\n", " trajectory_constraints=traj_constraints,\n", ")\n", "\n", "# Create the response for the new controller\n", "rhc_lqr_resp = run_rhc_and_plot(\n", " proc, ocp_lqr, X0, 10, plot=False, print_summary=False)\n", "\n", "# Plot the different responses to compare them\n", "fig, ax = plt.subplots(2, 1)\n", "ax[0].plot(rhc_resp.time, rhc_resp.states[0], label='RHC + P_1')\n", "ax[0].plot(rhc_lqr_resp.time, rhc_lqr_resp.states[0], '--', label='RHC + P_lqr')\n", "ax[0].plot(lqr_resp.time, lqr_resp.outputs[0], ':', label='LQR')\n", "ax[0].legend()\n", "\n", "ax[1].plot(rhc_resp.time, rhc_resp.inputs[0], label='RHC + P_1')\n", "ax[1].plot(rhc_lqr_resp.time, rhc_lqr_resp.inputs[0], '--', label='RHC + P_lqr')\n", "ax[1].plot(lqr_resp.time, lqr_resp.outputs[2], ':', label='LQR')" ] }, { "cell_type": "markdown", "id": "9497530b", "metadata": {}, "source": [ "## Discrete time RHC\n", "\n", "Many receding horizon control problems are solved based on a discrete time model. We show here how to implement this for a \"double integrator\" system, which in discrete time has the form\n", "\n", "$$\n", " x[k+1] = \\begin{bmatrix} 1 & 1 \\\\ 0 & 1 \\end{bmatrix} x[k] + \\begin{bmatrix} 0 \\\\ 1 \\end{bmatrix} \\text{clip}(u[k])\n", "$$\n" ] }, { "cell_type": "code", "execution_count": null, "id": "ae7cefa5", "metadata": {}, "outputs": [], "source": [ "#\n", "# System definition\n", "#\n", "\n", "def doubleint_update(t, x, u, params):\n", " # Get the parameters\n", " lb = params.get('lb', -1)\n", " ub = params.get('ub', 1)\n", " assert lb < ub\n", "\n", " # Get the sampling time\n", " dt = params.get('dt', 1)\n", "\n", " # bound the input\n", " u_clip = np.clip(u, lb, ub)\n", "\n", " return np.array([x[0] + dt * x[1], x[1] + dt * u_clip[0]])\n", "\n", "proc = ct.NonlinearIOSystem(\n", " doubleint_update, None, name=\"double integrator\",\n", " inputs = ['u'], outputs=['x[0]', 'x[1]'], states=2,\n", " params={'dt': 1}, dt=1)\n", "\n", "#\n", "# Linear quadratic regulator\n", "#\n", "\n", "# Define the cost functions to use\n", "Qx = np.diag([1, 0]) # state cost\n", "Qu = np.diag([1]) # input cost\n", "P1 = np.diag([0.1, 0.1]) # terminal cost\n", "\n", "# Get the LQR solution\n", "K, P, E = ct.dlqr(proc.linearize(0, 0), Qx, Qu)\n", "\n", "# Test out the LQR controller, with no constraints\n", "linsys = proc.linearize(0, 0)\n", "clsys_lin = ct.ss(linsys.A - linsys.B @ K, linsys.B, linsys.C, 0, dt=proc.dt)\n", "\n", "X0 = np.array([2, 1]) # initial conditions\n", "Tf = 10 # simulation time\n", "res = ct.initial_response(clsys_lin, Tf, X0=X0)\n", "\n", "# Plot the results\n", "plt.figure(1); plt.clf(); ax = plt.axes()\n", "ax.plot(res.time, res.states[0], 'k-', label='$x_1$')\n", "ax.plot(res.time, (-K @ res.states)[0], 'b-', label='$u$')\n", "\n", "# Test out the LQR controller with constraints\n", "clsys_lqr = ct.feedback(proc, -K, 1)\n", "tvec = np.arange(0, Tf, proc.dt)\n", "res_lqr_const = ct.input_output_response(clsys_lqr, tvec, 0, X0)\n", "\n", "# Plot the results\n", "ax.plot(res_lqr_const.time, res_lqr_const.states[0], 'k--', label='constrained')\n", "ax.plot(res_lqr_const.time, (-K @ res_lqr_const.states)[0], 'b--')\n", "ax.plot([0, 7], [-1, -1], 'k--', linewidth=0.75)\n", "\n", "# Adjust the limits for consistency\n", "ax.set_ylim([-4, 3.5])\n", "\n", "# Label the results\n", "ax.set_xlabel(\"Time $t$ [sec]\")\n", "ax.set_ylabel(\"State $x_1$, input $u$\")\n", "ax.legend(loc='lower right', labelspacing=0)\n", "plt.title(\"Linearized LQR response from x0\")" ] }, { "cell_type": "code", "execution_count": null, "id": "13cfc5d8", "metadata": {}, "outputs": [], "source": [ "#\n", "# Receding horizon controller\n", "#\n", "\n", "# Create the constraints\n", "traj_constraints = opt.input_range_constraint(proc, -1, 1)\n", "term_constraints = opt.state_range_constraint(proc, [0, 0], [0, 0])\n", "\n", "# Define the optimal control problem we want to solve\n", "T = 5\n", "timepts = np.arange(0, T * proc.dt, proc.dt)\n", "\n", "# Set up the optimal control problems\n", "ocp_orig = opt.OptimalControlProblem(\n", " proc, timepts,\n", " opt.quadratic_cost(proc, Qx, Qu),\n", " trajectory_constraints=traj_constraints,\n", " terminal_cost=opt.quadratic_cost(proc, P1, None),\n", ")\n", "\n", "ocp_lqr = opt.OptimalControlProblem(\n", " proc, timepts,\n", " opt.quadratic_cost(proc, Qx, Qu),\n", " trajectory_constraints=traj_constraints,\n", " terminal_cost=opt.quadratic_cost(proc, P, None),\n", ")\n", "\n", "ocp_low = opt.OptimalControlProblem(\n", " proc, timepts,\n", " opt.quadratic_cost(proc, Qx, Qu),\n", " trajectory_constraints=traj_constraints,\n", " terminal_cost=opt.quadratic_cost(proc, P/10, None),\n", ")\n", "\n", "ocp_high = opt.OptimalControlProblem(\n", " proc, timepts,\n", " opt.quadratic_cost(proc, Qx, Qu),\n", " trajectory_constraints=traj_constraints,\n", " terminal_cost=opt.quadratic_cost(proc, P*10, None),\n", ")\n", "weight_list = [P1, P, P/10, P*10]\n", "ocp_list = [ocp_orig, ocp_lqr, ocp_low, ocp_high]\n", "\n", "# Do a test run to figure out how long computation takes\n", "start_time = time.process_time()\n", "ocp_lqr.compute_trajectory(X0)\n", "stop_time = time.process_time()\n", "print(\"* Process time: %0.2g s\\n\" % (stop_time - start_time))\n", "\n", "# Create a figure to use for plotting\n", "fig, [[ax_orig, ax_lqr], [ax_low, ax_high]] = plt.subplots(2, 2)\n", "ax_list = [ax_orig, ax_lqr, ax_low, ax_high]\n", "ax_name = ['orig', 'lqr', 'low', 'high']\n", "\n", "# Generate the individual traces for the receding horizon control\n", "for ocp, ax, name, Pf in zip(ocp_list, ax_list, ax_name, weight_list):\n", " x, t = X0, 0\n", " for i in np.arange(0, Tf, proc.dt):\n", " # Calculate the optimal trajectory\n", " res = ocp.compute_trajectory(x, print_summary=False)\n", " soln = ct.input_output_response(proc, res.time, res.inputs, x)\n", "\n", " # Plot the results for this time instant\n", " ax.plot(res.time[:2] + t, res.inputs[0, :2], 'b-', linewidth=1)\n", " ax.plot(res.time[:2] + t, soln.outputs[0, :2], 'k-', linewidth=1)\n", " \n", " # Plot the results projected forward\n", " ax.plot(res.time[1:] + t, res.inputs[0, 1:], 'b--', linewidth=0.75)\n", " ax.plot(res.time[1:] + t, soln.outputs[0, 1:], 'k--', linewidth=0.75)\n", " \n", " # Update the state to use for the next time point\n", " x = soln.states[:, 1]\n", " t += proc.dt\n", "\n", " # Adjust the limits for consistency\n", " ax.set_ylim([-1.5, 3.5])\n", "\n", " # Label the results\n", " ax.set_xlabel(\"Time $t$ [sec]\")\n", " ax.set_ylabel(\"State $x_1$, input $u$\")\n", " ax.set_title(f\"MPC response for {name}\")\n", " plt.tight_layout()" ] }, { "cell_type": "markdown", "id": "015dc953", "metadata": {}, "source": [ "We can also implement a receding horizon controller for a discrete time system using opt.create_mpc_iosystem. This creates a controller that accepts the current state as the input and generates the control to apply from that state." ] }, { "cell_type": "code", "execution_count": null, "id": "4f8bb594", "metadata": {}, "outputs": [], "source": [ "# Construct using create_mpc_iosystem\n", "clsys = opt.create_mpc_iosystem(\n", " proc, timepts, opt.quadratic_cost(proc, Qx, Qu), traj_constraints,\n", " terminal_cost=opt.quadratic_cost(proc, P1, None), \n", ")\n", "print(clsys)" ] }, { "cell_type": "markdown", "id": "f1b08fb4", "metadata": {}, "source": [ "(This function needs some work to be more user-friendly, e.g. renaming of the inputs and outputs.)" ] }, { "cell_type": "code", "execution_count": null, "id": "d2afd287", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "language_info": { "name": "python" } }, "nbformat": 4, "nbformat_minor": 5 }