Commit 6cdddedb authored by charlie-ablett's avatar charlie-ablett
Browse files

Standalone (single-step) execution

parent 499f14a3
......@@ -28,10 +28,9 @@ module Api
end
def download_input_zip
zip_path = process_chain.assemble_input_file_zip
ap "Assembled #{zip_path}"
process_chain.assemble_input_file_zip
send_file(zip_path,
send_file(process_chain.zip_path,
:disposition => 'attachment',
:url_based_filename => true)
end
......@@ -68,8 +67,8 @@ module Api
end
def authorise_account!
if process_chain.account != current_entity.account
e = ExecutionErrors::NotAuthorisedError.new("That recipe is not accessible to you.")
if(process_chain.account != current_entity.account) && !current_entity.account.admin?
e = ExecutionErrors::NotAuthorisedError.new("That process chain is not accessible to you.")
render_error(e)
end
end
......
......@@ -105,7 +105,7 @@ module Api
def execution_params
ex_params = params[:execution_parameters] || {}
if ex_params.is_a?(String)
return JSON.parse(ex_params)
JSON.parse(ex_params)
else
ex_params
end
......
module Api
module V1
class SingleStepExecutionsController < ApplicationController
include ExecutionErrors
include DirectoryMethods
before_action :authenticate!
before_action :authorise_account!, only: [:show, :download_input_file, :download_input_zip, :download_output_file, :download_output_zip]
respond_to :json
def show
render json: single_step_execution.as_json
end
def create
new_single_step_execution[:execution_parameters] = params[:single_step_execution][:execution_parameters]
new_single_step_execution.save!
new_single_step_execution.start_execution!(code: params[:single_step_execution][:code])
render json: new_single_step_execution
rescue => e
ap e.message
ap e.backtrace
render_error(e)
end
def index
render json: current_entity.account.single_step_executions.as_json
end
def zip_path
"/tmp/standalone_#{id}_input.zip"
end
def download_input_file
ap "Downloading #{params[:relative_path]}..."
file_path = assemble_file_path(location: single_step_execution.input_files_directory, relative_path: params[:relative_path])
send_file(file_path,
:disposition => 'attachment',
:url_based_filename => true)
end
def download_input_zip
zip_path = single_step_execution.assemble_input_file_zip
ap "Assembled #{zip_path}"
send_file(zip_path,
:disposition => 'attachment',
:url_based_filename => true)
end
def download_output_file
ap "Downloading #{params[:relative_path]} from #{single_step_execution.working_directory}..."
unless single_step_execution.finished?
render_not_found_error("Not finished processing yet")
return
end
file_path = assemble_file_path(location: single_step_execution.working_directory, relative_path: params[:relative_path])
send_file(file_path,
:disposition => 'attachment',
:url_based_filename => true)
end
def download_output_zip
unless single_step_execution.finished?
render_not_found_error("Not finished processing yet")
return
end
zip_path = single_step_execution.assemble_output_file_zip
ap "Assembled #{zip_path}"
send_file(zip_path,
:disposition => 'attachment',
:url_based_filename => true)
end
private
def single_step_execution_params
params.require(:single_step_execution).permit(:description, :step_class_name, :execution_parameters, :input_file_list)
end
def single_step_execution
@single_step_execution ||= current_entity.account.single_step_executions.find(params[:id])
end
def single_step_executions
@single_step_execution ||= current_entity.account.single_step_executions
end
def new_single_step_execution
@single_step_execution ||= current_entity.account.single_step_executions.new(single_step_execution_params)
end
def authorise_account!
if single_step_execution.account != current_entity.account || !current_entity.account.admin?
e = ExecutionErrors::NotAuthorisedError.new("This is not accessible to you.")
render_error(e)
end
end
end
end
end
\ No newline at end of file
module DownloadableMethods
def open_input_files
recursive_file_list(input_files_directory).inject([]) do |list, file|
list << UploadedFile.new(input_files_directory: input_files_directory, relative_path: file)
list
end
end
def save_input_file_manifest!
self.input_file_list = assemble_manifest(directory: input_files_directory)
save!
end
def input_file_manifest
unless input_file_list.present?
save_input_file_manifest!
end
input_file_list
end
def assemble_input_file_zip
Dir.chdir(input_files_directory) do
unless File.exists?(zip_path)
`zip -r "#{zip_path}" *`
end
end
rescue => e
ap e.message
ap e.backtrace
end
def output_file_manifest
if respond_to?(:last_step)
return last_step.output_file_manifest
end
if !finished?
[]
elsif output_file_list.present?
output_file_list
elsif File.exists?(working_directory)
assemble_manifest(directory: working_directory)
else
# @TODO flag an error to admin!
ap "Cannot find file location for #{self.class.name} id '#{self.id}'"
ap "Looking in #{working_directory}"
[]
end
end
def assemble_output_file_zip
if respond_to?(:last_step)
return last_step.assemble_output_file_zip
end
zip_path = "/tmp/step_#{id}_output.zip"
Dir.chdir(working_directory) do
unless File.exists?(zip_path)
`zip -r "#{zip_path}" *`
end
end
zip_path
end
end
\ No newline at end of file
......@@ -72,4 +72,22 @@ module EventConstants
'process_step_completed'
end
# standalone execution events
def standalone_execution_channel(account_id)
"#{account_id}_standalone_execution"
end
def standalone_execution_started_event(account_id)
"#{account_id}_standalone_execution_started"
end
def standalone_execution_finished_event(account_id)
"#{account_id}_standalone_execution_completed"
end
def standalone_execution_error_event
'standalone_processing_error'
end
end
\ No newline at end of file
require 'yaml'
module Execution
class StandaloneExecutionRunner
include EventConstants
include DirectoryMethods
include ExecutionErrors
attr_accessor :working_directory, :klass
def initialize(klass:, single_step_execution:)
# load behaviour class
@klass = klass
@standalone_execution = single_step_execution
@working_directory = @standalone_execution.working_directory
end
def run!
trigger_event(channels: execution_channel, event: standalone_execution_started_event(@standalone_execution.account_id), data: { single_step_execution_id: @standalone_execution.id})
@standalone_execution.update_attribute(:executed_at, Time.zone.now)
execute_standalone_step
trigger_event(channels: execution_channel, event: standalone_execution_finished_event(@standalone_execution.account_id), data: { output_file_manifest: @standalone_execution.output_file_manifest })
rescue => e
error = e
trigger_event(channels: execution_channel, event: standalone_execution_error_event, data: { output_file_manifest: @standalone_execution.output_file_manifest, error: e.message})
ensure
@standalone_execution.update_attribute(:finished_at, Time.now)
if error
raise error
end
end
def execute_standalone_step
behaviour_step = klass.new(chain_file_location: working_directory, position: 1)
trigger_step_started_event(behaviour_step)
begin
behaviour_step.combined_parameters = @standalone_execution.execution_parameters
behaviour_step.execute
# rescue => e
# log(e.message)
# log(e.backtrace)
ensure
process_step.map_results(behaviour_step: behaviour_step)
trigger_standalone_execution_finished_event(behaviour_step, process_step)
end
end
def trigger_step_started_event(behaviour_step)
trigger_event(channels: standalone_execution_channel(@standalone_execution.account_id),
event: standalone_execution_started_event(@standalone_execution.account_id),
data: { position: behaviour_step.position })
end
def trigger_standalone_execution_finished_event(behaviour_step, process_step)
trigger_event(channels: standalone_execution_channel(@standalone_execution.account_id),
event: standalone_execution_finished_event(@standalone_execution.account_id),
data: { position: behaviour_step.position,
successful: behaviour_step.successful,
notes: behaviour_step.notes,
execution_errors: behaviour_step.errors,
process_log_location: process_step.process_log_file_name,
output_file_manifest: process_step.output_file_manifest })
end
end
end
class NilClass
def new
nil
end
end
......@@ -4,6 +4,8 @@ module ObjectMethods
mod.const_get(class_name)
end
rescue => e
ap e.message
ap e.backtrace
nil
end
end
\ No newline at end of file
......@@ -14,6 +14,8 @@ class Account < ApplicationRecord
has_many :account_roles, inverse_of: :account
has_many :recipe_favourites, inverse_of: :account
has_many :recipe_step_presets, inverse_of: :account
has_many :single_step_executions, inverse_of: :account
has_one :service
def roles
......
......@@ -3,23 +3,12 @@ require 'yaml'
require 'ink_step/mixins/helper_methods'
require 'constants'
# create_table "process_chains", force: :cascade do |t|
# t.integer "account_id", null: false
# t.datetime "executed_at"
# t.string "input_file"
# t.integer "recipe_id", null: false
# t.datetime "created_at", null: false
# t.datetime "updated_at", null: false
# t.datetime "finished_at"
# t.string "slug"
# t.text "input_file_manifest"
# end
class ProcessChain < ApplicationRecord
include ExecutionErrors
include SlugMethods
include DirectoryMethods
include EventConstants
include DownloadableMethods
serialize :input_file_list
......@@ -114,42 +103,7 @@ class ProcessChain < ApplicationRecord
File.join(working_directory, Constants::INPUT_FILE_DIRECTORY_NAME)
end
def open_input_files
recursive_file_list(input_files_directory).inject([]) do |list, file|
list << UploadedFile.new(input_files_directory: input_files_directory, relative_path: file)
list
end
end
def save_input_file_manifest!
self.input_file_list = assemble_manifest(directory: input_files_directory)
save!
end
def input_file_manifest
unless input_file_list.present?
save_input_file_manifest!
end
input_file_list
end
def output_file_manifest
last_step.output_file_manifest
rescue => e
nil
end
def assemble_output_file_zip
last_step.assemble_output_file_zip
end
def assemble_input_file_zip
zip_path = "/tmp/chain_#{id}_input.zip"
Dir.chdir(input_files_directory) do
unless File.exists?(zip_path)
`zip -r "#{zip_path}" *`
end
end
zip_path
def zip_path
"/tmp/chain_#{id}_input.zip"
end
end
\ No newline at end of file
# create_table "process_steps", force: :cascade do |t|
# t.integer "process_chain_id", null: false
# t.integer "position", null: false
# t.text "notes"
# t.datetime "executed_at"
# t.text "execution_errors"
# t.datetime "created_at", null: false
# t.datetime "updated_at", null: false
# t.string "step_class_name", null: false
# t.string "version"
# t.datetime "started_at"
# t.datetime "finished_at"
# t.text "output_file_list"
# t.boolean "successful"
# t.json "execution_parameters", default: {}, null: false
# end
class ProcessStep < ApplicationRecord
include ObjectMethods
include DirectoryMethods
include DownloadableMethods
serialize :output_file_list
......@@ -31,21 +15,6 @@ class ProcessStep < ApplicationRecord
class_from_string(step_class_name)
end
def output_file_manifest
if !finished?
[]
elsif output_file_list.present?
output_file_list
elsif File.exists?(working_directory)
assemble_manifest(directory: working_directory)
else
# @TODO flag an error to admin!
ap "Cannot find file location for process step id '#{self.id}', chain id '#{process_chain_id}' and recipe id '#{process_chain.recipe_id}'"
ap "Looking in #{working_directory}"
[]
end
end
def working_directory
File.join(process_chain.working_directory, position.to_s)
end
......@@ -62,16 +31,6 @@ class ProcessStep < ApplicationRecord
!!started_at
end
def assemble_output_file_zip
zip_path = "/tmp/step_#{id}_output.zip"
Dir.chdir(working_directory) do
unless File.exists?(zip_path)
`zip -r "#{zip_path}" *`
end
end
zip_path
end
def process_log_file_name
"process_step_#{self.id}.log"
end
......
class SingleStepExecution < ApplicationRecord
include ObjectMethods
include DirectoryMethods
include SlugMethods
include DownloadableMethods
serialize :output_file_list
belongs_to :account, inverse_of: :single_step_executions
validates_presence_of :account, :description
before_save :generate_unique_slug
# def output_file_manifest
# if !finished?
# []
# elsif output_file_list.present?
# output_file_list
# elsif File.exists?(working_directory)
# assemble_manifest(directory: working_directory)
# else
# # @TODO flag an error to admin!
# # ap "Cannot find file location for process step id '#{self.id}', chain id '#{process_chain_id}' and recipe id '#{process_chain.recipe_id}'"
# ap "Looking in #{working_directory}"
# []
# end
# end
def working_directory
File.join(Constants::FILE_LOCATION, "single_step_executions", slug)
end
def start_execution!(code:, callback_url: nil)
initialize_directories
write_code_to_file(code: code)
StandaloneExecutionWorker.perform_async(self.id, callback_url)
end
def code_file_name
"#{step_class_name.split("::").last.underscore}.rb"
end
def code_file_location
File.join(working_directory, code_file_name)
end
def write_code_to_file(code:)
the_file = File.new(code_file_location, 'w+')
the_file.write(code)
the_file.close
ap "wrote code to #{code_file_location}"
end
def output_files_location
working_directory
end
def finished?
!!finished_at
end
def started?
!!executed_at
end
def step_class
class_from_string(step_class_name)
end
def initialize_directories
create_directory_if_needed(Constants::FILE_LOCATION)
create_directory_if_needed(working_directory)
create_directory_if_needed(input_files_directory)
create_directory_if_needed(output_files_directory)
end
def input_files_directory
File.join(working_directory, Constants::INPUT_FILE_DIRECTORY_NAME)
end
def output_files_directory
File.join(working_directory, Constants::OUTPUT_FILE_DIRECTORY_NAME)
end
def assemble_output_file_zip
zip_path = "/tmp/step_#{id}_output.zip"
Dir.chdir(working_directory) do
unless File.exists?(zip_path)
`zip -r "#{zip_path}" *`
end
end
zip_path
end
def process_log_file_name
"process_step_#{self.id}.log"
end
def map_results(behaviour_step:)
self.execution_errors = [behaviour_step.errors].flatten.map{|line| line.gsub(working_directory, "$process_step_working_directory")}
self.notes = [behaviour_step.notes].flatten.map{|line| line.gsub(working_directory, "$process_step_working_directory")}
self.executed_at = behaviour_step.executed_at
self.finished_at = behaviour_step.finished_at
self.successful = behaviour_step.successful
self.output_file_list = behaviour_step.semantically_tagged_manifest
self.process_log = behaviour_step.process_log
save!
end
end
\ No newline at end of file
require "yaml"
class SingleStepExecutionSerializer < ActiveModel::Serializer
attributes :id, :account_id, :description, :description, :slug, :notes, :execution_errors,
:output_file_manifest, :input_file_manifest, :executed_at, :finished_at, :successful,
:execution_parameters, :process_log_location, :errors
def execution_parameters
object.execution_parameters || {}
end
def process_log_location
object.process_log_file_name
end
def executed_at
object.executed_at.nil? ? nil : object.executed_at.iso8601
end
def finished_at
object.finished_at.nil? ? nil : object.finished_at.iso8601
end
def execution_errors
return "" if object.execution_errors.nil?
errors = [YAML::load(object.execution_errors)].flatten
errors.join(", ").gsub(/\n/, "")
end
def notes
return "" if object.notes.nil?
notes = [YAML::load(object.notes)].flatten
notes.join(", ").gsub(/\n/, "")
end
end
$main_binding = binding
require 'httparty'
require 'open3'
class StandaloneExecutionWorker