diff --git a/Python3.6-CkafkaConnectorDataDesensitization/config.json b/Python3.6-CkafkaConnectorDataDesensitization/config.json new file mode 100644 index 000000000..145d900c3 --- /dev/null +++ b/Python3.6-CkafkaConnectorDataDesensitization/config.json @@ -0,0 +1,31 @@ +{ + "serverless-cloud-function-application": { + "Chinese": { + "name": "Ckafka 连接器数据脱敏", + "description": "使用Ckafka 连接器+云函数,提供数据脱敏能力,示例代码演示了将真实手机号进行脱敏的实现,支持功能自定义。", + "attention": "", + "tutorial":"", + "author": { + "name": "腾讯云" + } + }, + "English": { + "name": "CkafkaConnectorDataDesensitization", + "description": "This demo will receive Ckafka Connector's messages and do data desensitization job at real time.", + "attention": "", + "tutorial":" ", + "author": { + "name": "Tencent Cloud" + } + }, + "runtime": "Python3.6", + "readme": "https://github.com/tencentyun/serverless-demo/tree/master/Python3.6-CkafkaConnectorDataDesensitization", + "version": "1.0.0", + "tags": [ + "Python3.6", + "Ckafka", + "Connector", + "ETL" + ] + } +} diff --git a/Python3.6-CkafkaConnectorDataDesensitization/serverless.yml b/Python3.6-CkafkaConnectorDataDesensitization/serverless.yml new file mode 100755 index 000000000..5a718d651 --- /dev/null +++ b/Python3.6-CkafkaConnectorDataDesensitization/serverless.yml @@ -0,0 +1,11 @@ +component: scf +name: ap-guangzhou_default_CkafkaConnectorDataDesensitization +inputs: + name: CkafkaConnectorDataDesensitization + src: ./src + handler: index.main_handler + runtime: Python3.6 + namespace: default + region: ap-guangzhou + memorySize: 128 + timeout: 30 \ No newline at end of file diff --git a/Python3.6-CkafkaConnectorDataDesensitization/src/index.py b/Python3.6-CkafkaConnectorDataDesensitization/src/index.py new file mode 100755 index 000000000..6bbceda50 --- /dev/null +++ b/Python3.6-CkafkaConnectorDataDesensitization/src/index.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python +# -*- coding: utf8 -*- + + +import re + + +def main_handler(event, context): + """ + CKafka Connector 传递过来的 event 结构如下: + { + "event": { + "data": [ + msg1, + msg2, + msg3 + ] + } + + } + 在这里,测试数据结构为: + { + "event": { + "data": [ + "13911112222", + "13122223333", + "2345678" + ] + } + } + """ + print('start main_handler') + num = len(event['event']['data']) + print('the length of msg body is [%s]'%num) + messages = [] + for message in event['event']['data']: + pattern = re.compile(r'^1[3|4|5|7|8][0-9]{9}$', re.I) + searchObj = pattern.match(message) # 判断是否是合法手机号,如果不是,则不进行处理 + if searchObj: + # 对手机号进行脱敏,将中间的四位数字换成星号 + message = re.sub(r"(\d{3})\d{4}(\d{4})", r"\1****\2", message) + else: + print('[%s] is not a valid phone number'%message) + messages.append(message) + result = { + 'result': 'Succeed', + 'data': messages + } + # 如果处理失败,可封装错误消息体: + # {"result": "Failed"} + return result + + +if __name__ == '__main__': + event = { + "event": { + "data": [ + "13911112222", + "13122223333", + "2345678" + ] + } + } + result = main_handler(event, {}) + print(result) diff --git a/Python3.6-CkafkaConnectorDataEnrichment/config.json b/Python3.6-CkafkaConnectorDataEnrichment/config.json new file mode 100644 index 000000000..203933fec --- /dev/null +++ b/Python3.6-CkafkaConnectorDataEnrichment/config.json @@ -0,0 +1,32 @@ +{ + "serverless-cloud-function-application": { + "Chinese": { + "name": "Ckafka 连接器数据富化", + "description": "使用Ckafka 连接器+云函数,提供数据富化能力,示例代码演示了数据反序列化、数据库实时查询数据进行数据富化的实现,支持功能自定义。", + "attention": "", + "tutorial":"", + "author": { + "name": "腾讯云" + } + }, + "English": { + "name": "CkafkaConnectorETL", + "description": "This demo will receive Ckafka Connector's messages and do data enrichment job at real time.", + "attention": "", + "tutorial":" ", + "author": { + "name": "Tencent Cloud" + } + }, + "runtime": "Python3.6", + "readme": "https://github.com/tencentyun/serverless-demo/tree/master/Python3.6-CkafkaConnectorETL", + "version": "1.0.0", + "tags": [ + "Python3.6", + "Ckafka", + "Connector", + "ETL", + "MySQL" + ] + } +} diff --git a/Python3.6-CkafkaConnectorDataEnrichment/serverless.yml b/Python3.6-CkafkaConnectorDataEnrichment/serverless.yml new file mode 100755 index 000000000..cc1c8d561 --- /dev/null +++ b/Python3.6-CkafkaConnectorDataEnrichment/serverless.yml @@ -0,0 +1,22 @@ +component: scf +name: ap-guangzhou_default_CkafkaConnectorDataDesensitization +inputs: + name: CkafkaConnectorDataDesensitization + src: ./src + handler: index.main_handler + runtime: Python3.6 + namespace: default + region: ap-guangzhou + memorySize: 128 + timeout: 30 + environment: + - key: dbhost + value: db host + - key: dbport + value: db port + - key: dbuser + value: db user name + - key: dbpwd + value: db password + - key: dbname + value: db name \ No newline at end of file diff --git a/Python3.6-CkafkaConnectorDataEnrichment/src/index.py b/Python3.6-CkafkaConnectorDataEnrichment/src/index.py new file mode 100755 index 000000000..04e657c84 --- /dev/null +++ b/Python3.6-CkafkaConnectorDataEnrichment/src/index.py @@ -0,0 +1,112 @@ +#!/usr/bin/env python +# -*- coding: utf8 -*- + + +import base64 +import json +import logging +import os +import re +import time + +import pymysql + + +logger = logging.getLogger() +# Need to add the appropriate configuration in environment variables. 需要在环境变量中添加相应的配置 +DB_HOST = os.getenv('dbhost') # 数据库 host +DB_PORT = os.getenv('dbport') # 数据库 port +DB_USER = os.getenv('dbuser') # 数据库 username +DB_USER_PASSWORD = os.getenv('dbpwd') # 数据库密码 +DB_NAME = os.getenv('dbname') # 数据库名称 + + +class DBSource(object): + """ + Mysql 数据库作为数据源 + """ + def __init__(self): + try: + self.conn = pymysql.connect( + host=DB_HOST, + port=DB_PORT, + user=DB_USER, + passwd=DB_USER_PASSWORD, + db=DB_NAME, + connect_timeout=5) + except Exception as e: + logger.error(e) + logger.error("ERROR: Unexpected error: Could not connect to MySql instance.") + raise Exception(str(e)) + + def fetch(self, sql): + """ + 查询记录 + """ + with self.conn.cursor() as cursor: + cursor.execute(sql) + result = cursor.fetchall() + return result + +db_source = DBSource() + + +def main_handler(event, context): + """ + CKafka Connector 传递过来的 event 结构如下: + { + "event": { + "data": [ + msg1, + msg2, + ] + } + + } + 在这里,测试数据结构为: + { + "event": { + "data": [ + '{"name": "Anny", "age": 21, "phone": "13111111111"}', + '{"name": "Ernest", "age": 22, "phone": "13111112222"}', + ] + } + } + 其中,每一条数据都是 json string,包含的信息是用户的基础资料,分别是“姓名”、“年龄”、“手机号码”, + 通过数据库查询,例如用手机号作为查询条件,找到数据库中的记录,将数据库中存储的其他字段合并到这条数据中,例如用户的“住址”,完成数据富化操作。 + 最后,每一条记录都增加一个字段,记录当前处理的时间。 + """ + print('start main_handler') + num = len(event['event']['data']) + print('the length of msg body is [%s]'%num) + messages = [] + for message in event['event']['data']: + # 将 json string 解析成字典 + message = json.loads(message) + table = '<替换成实际的数据表>' + sql = 'SELECT * FROM %s WHERE phone = %s' % (table, message['phone']) + db_record = db_source.fetch(sql) + # 这里即可将 db_record 中的 address 字段添加到 message 中 + message['address'] = db_record[0]['address'] + message['time'] = time.time() + messages.append(message) + result = { + 'result': 'Succeed', + 'data': messages + } + # 如果处理失败,可封装错误消息体: + # {"result": "Failed"} + return result + + +if __name__ == '__main__': + event = { + "event": { + "data": [ + '{"name": "Anny", "age": 21, "phone": "13111111111"}', + '{"name": "Ernest", "age": 22, "phone": "13111112222"}', + ] + } + } + result = main_handler(event, {}) + print(result) diff --git a/Python3.6-CkafkaConnectorDataFilter/config.json b/Python3.6-CkafkaConnectorDataFilter/config.json new file mode 100644 index 000000000..7ef926d02 --- /dev/null +++ b/Python3.6-CkafkaConnectorDataFilter/config.json @@ -0,0 +1,31 @@ +{ + "serverless-cloud-function-application": { + "Chinese": { + "name": "Ckafka 连接器数据过滤", + "description": "使用Ckafka 连接器+云函数,提供数据过滤能力,示例代码演示了过滤真实手机号的实现,支持功能自定义。", + "attention": "", + "tutorial":"", + "author": { + "name": "腾讯云" + } + }, + "English": { + "name": "CkafkaConnectorDataFilter", + "description": "This demo will receive Ckafka Connector's messages and do data filtering job at real time.", + "attention": "", + "tutorial":" ", + "author": { + "name": "Tencent Cloud" + } + }, + "runtime": "Python3.6", + "readme": "https://github.com/tencentyun/serverless-demo/tree/master/Python3.6-CkafkaConnectorDataFilter", + "version": "1.0.0", + "tags": [ + "Python3.6", + "Ckafka", + "Connector", + "ETL" + ] + } +} diff --git a/Python3.6-CkafkaConnectorDataFilter/serverless.yml b/Python3.6-CkafkaConnectorDataFilter/serverless.yml new file mode 100755 index 000000000..b574f3b9c --- /dev/null +++ b/Python3.6-CkafkaConnectorDataFilter/serverless.yml @@ -0,0 +1,11 @@ +component: scf +name: ap-guangzhou_default_CkafkaConnectorDataFilter +inputs: + name: CkafkaConnectorDataFilter + src: ./src + handler: index.main_handler + runtime: Python3.6 + namespace: default + region: ap-guangzhou + memorySize: 128 + timeout: 30 \ No newline at end of file diff --git a/Python3.6-CkafkaConnectorDataFilter/src/index.py b/Python3.6-CkafkaConnectorDataFilter/src/index.py new file mode 100755 index 000000000..4e6d7748a --- /dev/null +++ b/Python3.6-CkafkaConnectorDataFilter/src/index.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python +# -*- coding: utf8 -*- + + +import re + + +def main_handler(event, context): + """ + CKafka Connector 传递过来的 event 结构如下: + { + "event": { + "data": [ + msg1, + msg2, + msg3 + ] + } + + } + 在这里,测试数据结构为: + { + "event": { + "data": [ + "13911112222", + "13122223333", + "2345678" + ] + } + } + """ + print('start main_handler') + num = len(event['event']['data']) + print('the length of msg body is [%s]'%num) + messages = [] + for message in event['event']['data']: + pattern = re.compile(r'^1[3|4|5|7|8][0-9]{9}$', re.I) + searchObj = pattern.match(message) + if searchObj: + messages.append(message) + else: + print('[%s] is not a valid phone number'%message) + result = { + 'result': 'Succeed', + 'data': messages + } + # 如果处理失败,可封装错误消息体: + # {"result": "Failed"} + return result + + +if __name__ == '__main__': + event = { + "event": { + "data": [ + "13911112222", + "13122223333", + "2345678" + ] + } + } + result = main_handler(event, {}) + print(result) diff --git a/Python3.6-CkafkaConnectorETL/config.json b/Python3.6-CkafkaConnectorETL/config.json new file mode 100644 index 000000000..d38917d7c --- /dev/null +++ b/Python3.6-CkafkaConnectorETL/config.json @@ -0,0 +1,31 @@ +{ + "serverless-cloud-function-application": { + "Chinese": { + "name": "Ckafka 连接器 ETL", + "description": "使用Ckafka 连接器+云函数,提供数据 ETL 能力,示例代码演示了数据反序列化、数据结构的映射、编解码等实现,支持功能自定义。", + "attention": "", + "tutorial":"", + "author": { + "name": "腾讯云" + } + }, + "English": { + "name": "CkafkaConnectorETL", + "description": "This demo will receive Ckafka Connector's messages and do ETL job at real time.", + "attention": "", + "tutorial":" ", + "author": { + "name": "Tencent Cloud" + } + }, + "runtime": "Python3.6", + "readme": "https://github.com/tencentyun/serverless-demo/tree/master/Python3.6-CkafkaConnectorETL", + "version": "1.0.0", + "tags": [ + "Python3.6", + "Ckafka", + "Connector", + "ETL" + ] + } +} diff --git a/Python3.6-CkafkaConnectorETL/serverless.yml b/Python3.6-CkafkaConnectorETL/serverless.yml new file mode 100755 index 000000000..5a718d651 --- /dev/null +++ b/Python3.6-CkafkaConnectorETL/serverless.yml @@ -0,0 +1,11 @@ +component: scf +name: ap-guangzhou_default_CkafkaConnectorDataDesensitization +inputs: + name: CkafkaConnectorDataDesensitization + src: ./src + handler: index.main_handler + runtime: Python3.6 + namespace: default + region: ap-guangzhou + memorySize: 128 + timeout: 30 \ No newline at end of file diff --git a/Python3.6-CkafkaConnectorETL/src/index.py b/Python3.6-CkafkaConnectorETL/src/index.py new file mode 100755 index 000000000..b7cc1ffd2 --- /dev/null +++ b/Python3.6-CkafkaConnectorETL/src/index.py @@ -0,0 +1,73 @@ +#!/usr/bin/env python +# -*- coding: utf8 -*- + + +import base64 +import json +import re +import time + + +def main_handler(event, context): + """ + CKafka Connector 传递过来的 event 结构如下: + { + "event": { + "data": [ + msg1, + msg2, + ] + } + + } + 在这里,测试数据结构为: + { + "event": { + "data": [ + '{"a": "Anny", "b": 21, "c": "13111111111", "d": "Y2xvdWQudGVuY2VudC5jb20vcHJvZHVjdC9zY2Y=\\n"}', + '{"a": "Ernest", "b": 22, "c": "13111112222", "d": "Y2xvdWQudGVuY2VudC5jb20vcHJvZHVjdC9zY2Y=\\n"}', + ] + } + } + 其中,每一条数据都是 json string,包含的信息是用户的基础资料,分别是“姓名”、“年龄”、“手机号码”、“最喜欢的云产品”。 + 下面将数据的 key 正确还原,以及对手机号进行脱敏,对喜欢的云产品 URL 进行 base64 decode。 + 最后,每一条记录都增加一个字段,记录当前处理的时间。 + """ + print('start main_handler') + num = len(event['event']['data']) + print('the length of msg body is [%s]'%num) + messages = [] + for message in event['event']['data']: + # 将 json string 解析成字典 + message = json.loads(message) + new_message = {} + # a -> name + new_message['name'] = message['a'] + # b -> age + new_message['age'] = message['b'] + # c -> phone, 手机号码进行脱敏,‘13111112222’ -> ‘131****2222’ + new_message['phone'] = re.sub(r"(\d{3})\d{4}(\d{4})", r"\1****\2", message['c']) + # d -> product, URL 进行 decodde + new_message['product'] = base64.decodebytes(message['d'].encode()).decode() + new_message['time'] = time.time() + messages.append(new_message) + result = { + 'result': 'Succeed', + 'data': messages + } + # 如果处理失败,可封装错误消息体: + # {"result": "Failed"} + return result + + +if __name__ == '__main__': + event = { + "event": { + "data": [ + '{"a": "Anny", "b": 21, "c": "13111111111", "d": "Y2xvdWQudGVuY2VudC5jb20vcHJvZHVjdC9zY2Y=\\n"}', + '{"a": "Ernest", "b": 22, "c": "13111112222", "d": "Y2xvdWQudGVuY2VudC5jb20vcHJvZHVjdC9zY2Y=\\n"}', + ] + } + } + result = main_handler(event, {}) + print(result)