diff --git a/application/src/main/java/com/hashmapinc/server/actors/computation/SparkComputationJobActorMessageProcessor.java b/application/src/main/java/com/hashmapinc/server/actors/computation/SparkComputationJobActorMessageProcessor.java index 857b76d22..7aadec9b2 100644 --- a/application/src/main/java/com/hashmapinc/server/actors/computation/SparkComputationJobActorMessageProcessor.java +++ b/application/src/main/java/com/hashmapinc/server/actors/computation/SparkComputationJobActorMessageProcessor.java @@ -61,6 +61,7 @@ public class SparkComputationJobActorMessageProcessor extends ComponentMsgProces private ObjectMapper objectMapper = new ObjectMapper(); private final ActorRef self; private final ActorRef parent; + private final String mainClass = "com.hashmapinc.tempus.Computation"; protected SparkComputationJobActorMessageProcessor(TenantId tenantId, ComputationJobId id, ActorSystemContext systemContext , LoggingAdapter logger, ActorRef parent, ActorRef self, Computations computation) { @@ -217,7 +218,7 @@ private String buildSparkComputationRequest() throws IOException { logger.info("Jar name is {}, main class is {}, arg parameters are {}, location is {}", md.getJarName(), md.getMainClass(), md.getArgsformat(), systemContext.getComputationLocation()); SparkComputationRequest.SparkComputationRequestBuilder builder = SparkComputationRequest.builder(); builder.file(systemContext.getComputationLocation() + md.getJarName()); - builder.className(md.getMainClass()); + builder.className(mainClass); builder.args(args()); SparkComputationRequest sparkComputationRequest = builder.build(); return objectMapper.writeValueAsString(sparkComputationRequest); @@ -239,10 +240,40 @@ private String[] args() { } } } + if(md.getArgsType().equals(ArgType.NAMED)){ + args.add("--computation-class"); + args.add(md.getMainClass()); + args.add("--source"); + String source = conf.get("source").asText(); + args.add(source); + if("kinesis".equalsIgnoreCase(source)){ + args.addAll(kinesisParameters(conf)); + }else{ + args.addAll(kafkaParameters(conf)); + } + } logger.info("Argument array list to spark job " + args); return args.toArray(new String[args.size()]); } + private List kinesisParameters(JsonNode conf){ + List kinesisArgs = new ArrayList(); + kinesisArgs.add("--kinesisStreamName"); + kinesisArgs.add(conf.get("kinesisStreamName").asText()); + kinesisArgs.add("--kinesisRegion"); + kinesisArgs.add(conf.get("kinesisRegion").asText()); + return kinesisArgs; + } + + private List kafkaParameters(JsonNode conf){ + List kafkaArgs = new ArrayList(); + kafkaArgs.add("--kafkaTopic"); + kafkaArgs.add(conf.get("kafkaTopic").asText()); + kafkaArgs.add("--kafkaUrl"); + kafkaArgs.add(conf.get("kafkaUrl").asText()); + return kafkaArgs; + } + private void stopJobOnServer(){ SparkComputationJob configuration = (SparkComputationJob) job.getConfiguration(); if(configuration.getJobId() != null){ diff --git a/ui/src/app/components/json-form.directive.js b/ui/src/app/components/json-form.directive.js index db6325f07..b12af5556 100644 --- a/ui/src/app/components/json-form.directive.js +++ b/ui/src/app/components/json-form.directive.js @@ -83,6 +83,7 @@ function JsonForm($compile, $templateCache, $mdColorPicker) { val = undefined; } selectOrSet(key, scope.model, val); + scope.formProps.model = scope.model; }, onColorClick: function(event, key, val) { scope.showColorPicker(event, val); diff --git a/ui/src/app/components/react/json-form-rc-select.jsx b/ui/src/app/components/react/json-form-rc-select.jsx index 4efea8422..c0c634244 100644 --- a/ui/src/app/components/react/json-form-rc-select.jsx +++ b/ui/src/app/components/react/json-form-rc-select.jsx @@ -92,7 +92,7 @@ class TempusRcSelect extends React.Component { } return ( -
+