Работа с данными
После развертывания базовой архитектуры Data Lakehouse следующий шаг — наполнение сервиса данными и их анализ. В этой инструкции рассматриваются примеры загрузки в Data Lakehouse данных из трех различных источников и выполнения SQL-запросов для анализа загруженных данных. Все манипуляции с данными выполняются путем SQL-запросов во внешние системы с помощью Cloud Trino.
Пройдя все шаги инструкции, вы узнаете:
- как загружать в Data Lakehouse данные из различных внешних источников: БД PostgreSQL, БД MySQL, а также из виртуальной БД из базовой поставки Cloud Trino;
- как при помощи Cloud Trino выполнять SQL-запросы для анализа, обработки и трансформации данных, загруженных в Data Lakehouse;
- как выгружать данные из Data Lakehouse во внешние БД;
- как при помощи Cloud Trino выполнять гибридные аналитические SQL-запросы одновременно к нескольким источникам данных, включая Data Lakehouse и внешние БД.
-
Подготовьте внешние базы данных PostgreSQL и MySQL.
В качестве примера использованы:
- БД MySQL
mydb1, содержащая таблицуtable1с произвольной структурой; - БД PostgreSQL
db1, содержащая таблицуtable2с произвольной структурой.
- БД MySQL
-
Разверните базовую архитектуру сервиса Data Lakehouse.
В качестве примера в процессе развертывания при настройке Cloud Trino созданы:
- подключение
s3db1к хранилищу S3 VK Cloud; - подключение
pgdb1к БД PostgreSQLdb1; - подключение
mysqldb1к БД MySQLmydb1.
После развертывания Cloud Trino в конфигурацию автоматически добавляются два подключения к виртуальным БД для генерации наборов синтетических данных большого объема и тестирования: tpcds и tpch. В примере используется подключение tpcds, а для запросов к виртуальной БД — одна из доступных в ней схем данных
sf10.Для запросов к БД PostgreSQL в примере используется стандартная схема данных
public. - подключение
-
Установите на ваш компьютер любой SQL-клиент для отправки запросов в Cloud Trino. Далее все запросы выполняются в SQL-клиенте.
-
Создайте схему
sch1для загрузки данных из внешних БД MySQL и PostgreSQL:CREATE SCHEMA s3db1.sch1; -
Создайте схему данных
sch2для хранения результатов вычислений:CREATE SCHEMA s3db1.sch2; -
Укажите схему данных, которая будет использоваться для загрузки данных из виртуальной БД:
CREATE SCHEMA s3db1.sf10;
Ожидаемый результат: в SQL-клиенте отображается сообщение об успешном создании новых схем данных.
Информация о каждой схеме хранилища S3 хранится в сервисе Cloud Iceberg Metastore, пока не создана первая таблица в этой схеме (через команду CREATE TABLE).
Как только будет создана первая таблица, Cloud Iceberg Metastore создаст в хранилище S3 директорию и сохранит в ней информацию:
- табличные данные в формате PARQUET;
- метаданные в формате JSON;
- дополнительные файлы, статистика и т.д.
-
Запросите данные из таблицы
table1внешней БД MySQL и загрузите их в новую таблицуtable1_from_mysqlв схемеsch1хранилища S3:CREATE TABLE s3db1.sch1.table1_from_mysql ASSELECT * FROM mysqldb1.mydb1.table1; -
Запросите данные из таблицы
table2внешней БД PostgreSQL и загрузите их в новую таблицуtable2_from_pgв схемеsch1хранилища S3:CREATE TABLE s3db1.sch1.table2_from_pg ASSELECT * FROM pgdb1.public.table2;
Ожидаемый результат: в SQL-клиенте отображается сообщение об успешном создании новых таблиц.
Запросите данные из нескольких таблиц виртуальной БД (customer, store, store_sales, store_returns) и загрузите их в одноименные новые таблицы в схеме sf10 хранилища S3:
CREATE TABLE s3db1.sf10.customer ASSELECT * FROM tpcds.sf10.customerCREATE TABLE s3db1.sf10.store ASSELECT * FROM tpcds.sf10.storeCREATE TABLE s3db1.sf10.store_sales ASSELECT * FROM tpcds.sf10.store_salesCREATE TABLE s3db1.sf10.store_returns ASSELECT * FROM tpcds.sf10.store_returns
Ожидаемый результат: в SQL-клиенте отображается сообщение об успешном создании новых таблиц.
Цель анализа — выявление клиентов с подозрительно высоким уровнем возвратов (больше, чем их покупки), что может указывать на мошенничество или проблемы с качеством товаров.
Данные для анализа загружены в схему sf10 хранилища S3 на предыдущем шаге:
- таблица
customer— данные о клиентах; - таблица
store— данные о магазинах; - таблицы
store_salesиstore_returns— данные о продажах и возвратах.
Для проведения анализа выполните SQL-запрос:
CREATE TABLE s3db1.sch2.analysis_from_s3 ASWITH sales AS (SELECT c.c_customer_sk,c.c_first_name,c.c_last_name,c.c_email_address,SUM(ss.ss_coupon_amt) AS coupon,SUM(ss.ss_sales_price) AS salesprice,SUM(ss.ss_ext_discount_amt) AS discountFROM s3db1.sf10.customer cJOIN s3db1.sf10.store_sales ss ON ss.ss_customer_sk = c.c_customer_skJOIN s3db1.sf10.store s ON s.s_store_sk = ss.ss_store_skGROUP BY c.c_customer_sk,c.c_first_name,c.c_last_name,c.c_email_address),storereturns AS (SELECT c.c_customer_sk,SUM(sr.sr_return_amt) AS returnamout,SUM(sr.sr_fee) AS feeFROM s3db1.sf10.customer cJOIN s3db1.sf10.store_returns sr ON sr.sr_customer_sk = c.c_customer_skJOIN s3db1.sf10.store s ON s.s_store_sk = sr.sr_store_skGROUP BY c.c_customer_sk)SELECT ss.c_customer_sk,ss.c_first_name,ss.c_last_name,ss.c_email_address,ss.salesprice,ss.discount,sr.returnamout,sr.feeFROM sales ssJOIN storereturns sr ON ss.c_customer_sk = sr.c_customer_skWHERE ss.salesprice < sr.returnamoutORDER BY ss.discount DESC limit 10
Cloud Trino выполняет SQL-запрос в следующем порядке:
- Составляет план выполнения SQL-запроса.
- Через API сервиса Iceberg Metastore получает доступ к таблицам хранилища S3 в схеме
sf10, куда ранее была записана информация из виртуальной БД. - В соответствии с планом выполнения читает и обрабатывает данные: применяет указанные в запросе фильтры и критерии группировки, вычисляет агрегаты и т.д.
- В результате обработки данных формирует список клиентов со связанными объектами, который удовлетворяет SQL-запросу.
- Записывает результирующую выборку данных в виде новой таблицы
analysis_from_s3в схемеsch2хранилища S3.
Ожидаемый результат: в SQL-клиенте отображаются 10 записей о клиентах с подозрительно высоким уровнем возвратов, получивших наибольшие скидки.
-
Запросите данные из таблицы
customerв схемеsf10хранилища S3 и сохраните их в виде одноименной новой таблицы во внешней БД MySQL.CREATE TABLE mysqldb1.mydb1.customer ASSELECT * FROM s3db1.sf10.customer; -
Запросите данные из таблицы
storeв схемеsf10хранилища S3 и сохраните их в виде одноименной новой таблицы во внешней БД PostgreSQL.CREATE TABLE pgdb1.public.store ASSELECT * FROM s3db1.sf10.store;
Ожидаемый результат: в SQL-клиенте отображается сообщение об успешном создании новых таблиц.
Цель анализа — выявление клиентов с подозрительно высоким уровнем возвратов (больше, чем их покупки), что может указывать на мошенничество или проблемы с качеством товаров.
Данные для анализа извлекаются одновременно из трех источников:
- данные о клиентах — из таблицы
customerво внешней БД MySQL; - данные о магазинах — из таблицы
storeво внешней БД PostgreSQL; - данные о продажах и возвратах — из таблиц
store_salesиstore_returnsв схемеsf10хранилища S3.
Для проведения анализа выполните SQL-запрос:
CREATE TABLE s3db1.sch2.analysis_hybrid ASWITH sales AS (SELECT c.c_customer_sk,c.c_first_name,c.c_last_name,c.c_email_address,SUM(ss.ss_coupon_amt) AS coupon,SUM(ss.ss_sales_price) AS salesprice,SUM(ss.ss_ext_discount_amt) AS discountFROM mysqldb1.mydb1.customer cJOIN s3db1.sf10.store_sales ss ON ss.ss_customer_sk = c.c_customer_skJOIN pgdb1.public.store s ON s.s_store_sk = ss.ss_store_skGROUP BY c.c_customer_sk,c.c_first_name,c.c_last_name,c.c_email_address),storereturns AS (SELECT c.c_customer_sk,SUM(sr.sr_return_amt) AS returnamout,SUM(sr.sr_fee) AS feeFROM mysqldb1.mydb1.customer cJOIN s3db1.sf10.store_returns sr ON sr.sr_customer_sk = c.c_customer_skJOIN pgdb1.public.store s ON s.s_store_sk = sr.sr_store_skGROUP BY c.c_customer_sk)SELECT ss.c_customer_sk,ss.c_first_name,ss.c_last_name,ss.c_email_address,ss.salesprice,ss.discount,sr.returnamout,sr.feeFROM sales ssJOIN storereturns sr ON ss.c_customer_sk = sr.c_customer_skWHERE ss.salesprice < sr.returnamoutORDER BY ss.discount DESC limit 10
Cloud Trino выполняет SQL-запрос в следующем порядке:
- Составляет план выполнения SQL-запроса.
- Через API сервиса Iceberg Metastore получает доступ к таблицам хранилища S3 в схеме
sf10, куда ранее была записана информация из виртуальной БД. - Отправляет напрямую во внешние БД запросы на получение данных из таблицы
customerБД MySQL и таблицыstoreБД PostgreSQL. - В соответствии с планом выполнения читает и обрабатывает данные: применяет указанные в запросе фильтры и критерии группировки, вычисляет агрегаты и т.д.
- В результате обработки данных формирует список клиентов со связанными объектами, который удовлетворяет SQL-запросу.
- Записывает результирующую выборку данных в виде новой таблицы
analysis_from_s3в схемеsch2хранилища S3.
Ожидаемый результат: в SQL-клиенте отображаются 10 записей о клиентах с подозрительно высоким уровнем возвратов, получивших наибольшие скидки.
Развернутая ранее инфраструктура сервиса Data Lakehouse потребляет вычислительные ресурсы и тарифицируется. Если она вам больше не нужна, удалите ее.