Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev 1.6.0 webank 1.7.0 #549

Merged
merged 11 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ import org.apache.linkis.common.conf.CommonVars

import org.apache.commons.lang3.StringUtils

import java.util.Locale

import scala.collection.mutable

object CodeAndRunTypeUtils {
private val CONF_LOCK = new Object()

Expand Down Expand Up @@ -101,14 +105,23 @@ object CodeAndRunTypeUtils {
def getLanguageTypeAndCodeTypeRelationMap: Map[String, String] = {
val codeTypeAndRunTypeRelationMap = getCodeTypeAndLanguageTypeRelationMap
if (codeTypeAndRunTypeRelationMap.isEmpty) Map()
else codeTypeAndRunTypeRelationMap.flatMap(x => x._2.map(y => (y, x._1)))
else {
// codeTypeAndRunTypeRelationMap.flatMap(x => x._2.map(y => (y, x._1)))
val map = mutable.Map[String, String]()
codeTypeAndRunTypeRelationMap.foreach(kv => {
kv._2.foreach(v => map.put(v, kv._1))
})
map.toMap
}
}

def getLanguageTypeByCodeType(codeType: String, defaultLanguageType: String = ""): String = {
if (StringUtils.isBlank(codeType)) {
return ""
}
getLanguageTypeAndCodeTypeRelationMap.getOrElse(codeType, defaultLanguageType)
val lowerCaseCodeType = codeType.toLowerCase(Locale.getDefault)
getLanguageTypeAndCodeTypeRelationMap.getOrElse(lowerCaseCodeType, defaultLanguageType)

}

/**
Expand Down
7 changes: 7 additions & 0 deletions linkis-commons/linkis-module/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,13 @@
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-openfeign-core</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
<version>1.3.4</version>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.springframework.core.env.Environment;
import org.springframework.core.env.PropertySource;
import org.springframework.core.env.StandardEnvironment;
import org.springframework.retry.annotation.EnableRetry;
import org.springframework.web.filter.CharacterEncodingFilter;

import javax.servlet.DispatcherType;
Expand All @@ -68,6 +69,7 @@
@EnableDiscoveryClient
@RefreshScope
@EnableFeignClients
@EnableRetry
public class DataWorkCloudApplication extends SpringBootServletInitializer {
private static final Log logger = LogFactory.getLog(DataWorkCloudApplication.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@

<dependency>
<groupId>org.apache.linkis</groupId>
<artifactId>linkis-bml-client</artifactId>
<artifactId>linkis-pes-client</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,37 @@

package org.apache.linkis.computation.client

import org.apache.linkis.computation.client.interactive.InteractiveJob
import org.apache.linkis.computation.client.once.OnceJob
import org.apache.linkis.bml.client.BmlClientFactory
import org.apache.linkis.computation.client.interactive.{InteractiveJob, InteractiveJobBuilder}
import org.apache.linkis.computation.client.once.{LinkisManagerClient, OnceJob}
import org.apache.linkis.computation.client.once.simple.{SimpleOnceJob, SimpleOnceJobBuilder}
import org.apache.linkis.httpclient.dws.config.DWSClientConfig
import org.apache.linkis.ujes.client.UJESClientImpl

import java.io.Closeable

class LinkisJobClient(clientConfig: DWSClientConfig) extends Closeable {

private val ujseClient = new UJESClientImpl(clientConfig)

private lazy val linkisManagerCLient = LinkisManagerClient(ujseClient)

override def close(): Unit = {
if (null != linkisManagerCLient) {
linkisManagerCLient.close()
}
}

def onceJobBuilder(): SimpleOnceJobBuilder =
SimpleOnceJob.builder(SimpleOnceJobBuilder.getBmlClient(clientConfig), linkisManagerCLient)

def interactiveJobBuilder(): InteractiveJobBuilder = {
val builder = InteractiveJob.builder()
builder.setUJESClient(ujseClient)
}

}

/**
* This class is only used to provide a unified entry for user to build a LinkisJob conveniently and
* simply. Please keep this class lightweight enough, do not set too many field to confuse user.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.linkis.computation.client.once.simple

import org.apache.linkis.bml.client.BmlClient
import org.apache.linkis.common.ServiceInstance
import org.apache.linkis.common.utils.Utils
import org.apache.linkis.computation.client.LinkisJobMetrics
Expand Down Expand Up @@ -157,6 +158,11 @@ object SimpleOnceJob {

def builder(): SimpleOnceJobBuilder = new SimpleOnceJobBuilder

def builder(
bmlClient: BmlClient,
linkisManagerClient: LinkisManagerClient
): SimpleOnceJobBuilder = new SimpleOnceJobBuilder(bmlClient, linkisManagerClient)

/**
* Build a submitted SimpleOnceJob by id and user.
* @param id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.linkis.computation.client.once.simple
import org.apache.linkis.bml.client.{BmlClient, BmlClientFactory}
import org.apache.linkis.common.utils.Utils
import org.apache.linkis.computation.client.LinkisJobBuilder
import org.apache.linkis.computation.client.LinkisJobBuilder.clientConfig
import org.apache.linkis.computation.client.once.LinkisManagerClient
import org.apache.linkis.computation.client.once.action.CreateEngineConnAction
import org.apache.linkis.computation.client.once.simple.SimpleOnceJobBuilder._
Expand All @@ -28,6 +29,8 @@ import org.apache.linkis.governance.common.entity.job.OnceExecutorContent
import org.apache.linkis.governance.common.utils.OnceExecutorContentUtils
import org.apache.linkis.governance.common.utils.OnceExecutorContentUtils.BmlResource
import org.apache.linkis.httpclient.dws.DWSHttpClient
import org.apache.linkis.httpclient.dws.authentication.TokenAuthenticationStrategy
import org.apache.linkis.httpclient.dws.config.{DWSClientConfig, DWSClientConfigBuilder}
import org.apache.linkis.manager.label.constant.LabelKeyConstant
import org.apache.linkis.protocol.utils.TaskUtils
import org.apache.linkis.ujes.client.exception.UJESJobException
Expand All @@ -38,12 +41,19 @@ import java.util
import scala.collection.convert.WrapAsJava._
import scala.collection.convert.WrapAsScala._

class SimpleOnceJobBuilder private[simple] () extends LinkisJobBuilder[SubmittableSimpleOnceJob] {
class SimpleOnceJobBuilder private[simple] (
private val bmlClient: BmlClient,
private val linkisManagerClient: LinkisManagerClient
) extends LinkisJobBuilder[SubmittableSimpleOnceJob] {

private var createService: String = _
private var maxSubmitTime: Long = _
private var description: String = _

def this() = {
this(null, null)
}

def setCreateService(createService: String): this.type = {
this.createService = createService
this
Expand All @@ -69,10 +79,26 @@ class SimpleOnceJobBuilder private[simple] () extends LinkisJobBuilder[Submittab
val contentMap = OnceExecutorContentUtils.contentToMap(onceExecutorContent)
val bytes = DWSHttpClient.jacksonJson.writeValueAsBytes(contentMap)
val response =
getBmlClient.uploadResource(executeUser, getFilePath, new ByteArrayInputStream(bytes))
getThisBMLClient.uploadResource(executeUser, getFilePath, new ByteArrayInputStream(bytes))
OnceExecutorContentUtils.resourceToValue(BmlResource(response.resourceId, response.version))
}

protected def getThisBMLClient(): BmlClient = {
if (null == this.bmlClient) {
getBmlClient(LinkisJobBuilder.getDefaultClientConfig)
} else {
this.bmlClient
}
}

protected def getThisLinkisManagerClient(): LinkisManagerClient = {
if (null == this.linkisManagerClient) {
getLinkisManagerClient
} else {
this.linkisManagerClient
}
}

override def build(): SubmittableSimpleOnceJob = {
ensureNotNull(labels, "labels")
ensureNotNull(jobContent, "jobContent")
Expand All @@ -99,7 +125,7 @@ class SimpleOnceJobBuilder private[simple] () extends LinkisJobBuilder[Submittab
.setMaxSubmitTime(maxSubmitTime)
.setDescription(description)
.build()
new SubmittableSimpleOnceJob(getLinkisManagerClient, createEngineConnAction)
new SubmittableSimpleOnceJob(getThisLinkisManagerClient, createEngineConnAction)
}

implicit def toMap(map: util.Map[String, Any]): util.Map[String, String] = map.map {
Expand Down Expand Up @@ -128,10 +154,27 @@ object SimpleOnceJobBuilder {
private var bmlClient: BmlClient = _
private var linkisManagerClient: LinkisManagerClient = _

def getBmlClient: BmlClient = {
def getBmlClient(clientConfig: DWSClientConfig): BmlClient = {
if (bmlClient == null) synchronized {
if (bmlClient == null) {
bmlClient = BmlClientFactory.createBmlClient(LinkisJobBuilder.getDefaultClientConfig)
val newClientConfig = DWSClientConfigBuilder
.newBuilder()
.addServerUrl(clientConfig.getServerUrl)
.connectionTimeout(clientConfig.getConnectTimeout)
.discoveryEnabled(clientConfig.isDiscoveryEnabled)
.loadbalancerEnabled(clientConfig.isLoadbalancerEnabled)
.maxConnectionSize(clientConfig.getMaxConnection)
.retryEnabled(clientConfig.isRetryEnabled)
.setRetryHandler(clientConfig.getRetryHandler)
.readTimeout(
clientConfig.getReadTimeout
) // We think 90s is enough, if SocketTimeoutException is throw, just set a new clientConfig to modify it.
.setAuthenticationStrategy(new TokenAuthenticationStrategy())
.setAuthTokenKey(TokenAuthenticationStrategy.TOKEN_KEY)
.setAuthTokenValue("LINKIS-AUTH-eTaYLbQpmIulPyrXcMl")
.setDWSVersion(clientConfig.getDWSVersion)
.build()
bmlClient = BmlClientFactory.createBmlClient(newClientConfig)
Utils.addShutdownHook(() => bmlClient.close())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,13 @@ object GovernanceCommonConf {
val EC_APP_MANAGE_MODE =
CommonVars("linkis.ec.app.manage.mode", "attach")

/**
* DEFAULT_LOGPATH_PREFIX is the prefix that represents the default log storage path
* DEFAULT_LOGPATH_PREFIX 是表示默认的日志存储路径的前缀 和 结果集的前缀
*/
val DEFAULT_LOGPATH_PREFIX = CommonVars[String](
"wds.linkis.entrance.config.log.path",
CommonVars[String]("wds.linkis.filesystem.hdfs.root.path").getValue
).getValue

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ import org.apache.linkis.governance.common.conf.GovernanceCommonConf
import org.apache.commons.lang3.StringUtils

import java.io.File
import java.text.SimpleDateFormat
import java.util
import java.util.{ArrayList, List}
import java.util.{ArrayList, Date, List}

object GovernanceUtils extends Logging {

Expand Down Expand Up @@ -121,4 +122,26 @@ object GovernanceUtils extends Logging {
}
}

/**
* get result path parentPath: resPrefix + dateStr + result + creator subPath: parentPath +
* executeUser + taskid + filename
*
* @param creator
* @return
*/
def getResultParentPath(creator: String): String = {
val resPrefix = GovernanceCommonConf.DEFAULT_LOGPATH_PREFIX
val resStb = new StringBuilder()
if (resStb.endsWith("/")) {
resStb.append(resPrefix)
} else {
resStb.append(resPrefix).append("/")
}
val dateFormat = new SimpleDateFormat("yyyy-MM-dd")
val date = new Date(System.currentTimeMillis)
val dateString = dateFormat.format(date)
resStb.append("result").append("/").append(dateString).append("/").append(creator)
resStb.toString()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

<dependency>
<groupId>org.apache.linkis</groupId>
<artifactId>linkis-udf-client</artifactId>
<artifactId>linkis-pes-rpc-client</artifactId>
<version>${project.version}</version>
</dependency>

Expand All @@ -55,7 +55,7 @@

<dependency>
<groupId>org.apache.linkis</groupId>
<artifactId>linkis-bml-client</artifactId>
<artifactId>linkis-pes-client</artifactId>
<version>${project.version}</version>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

<dependency>
<groupId>org.apache.linkis</groupId>
<artifactId>linkis-bml-client</artifactId>
<artifactId>linkis-pes-client</artifactId>
<version>${project.version}</version>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

<dependency>
<groupId>org.apache.linkis</groupId>
<artifactId>linkis-udf-client</artifactId>
<artifactId>linkis-pes-rpc-client</artifactId>
<version>${project.version}</version>
</dependency>

Expand Down Expand Up @@ -60,18 +60,6 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.linkis</groupId>
<artifactId>linkis-cs-client</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.linkis</groupId>
<artifactId>linkis-computation-governance-common</artifactId>
Expand All @@ -80,7 +68,7 @@

<dependency>
<groupId>org.apache.linkis</groupId>
<artifactId>linkis-bml-client</artifactId>
<artifactId>linkis-pes-client</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,6 @@ abstract class AsyncConcurrentComputationExecutor(override val outputPrintLimit:
})
} { e =>
logger.info("failed to do with hook", e)
engineExecutionContext.appendStdout(
LogUtils.generateWarn(s"failed execute hook: ${ExceptionUtils.getStackTrace(e)}")
)
}
if (hookedCode.length > 100) {
logger.info(s"hooked after code: ${hookedCode.substring(0, 100)} ....")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000)
EngineConnObject.getEngineCreationContext.getOptions.asScala.foreach({ case (key, value) =>
// skip log jobId because it corresponding jobid when the ec created
if (!ComputationExecutorConf.PRINT_TASK_PARAMS_SKIP_KEYS.getValue.contains(key)) {
sb.append(s"${key}=${value.toString}\n")
sb.append(s"${key}=${value}\n")
}
})

Expand Down
Loading
Loading