1.配置专有网络和交换机
2.创建安全组,控制流量的入站
3.在阿里云创建ECS云服务器
1cpu,4g,node1
1cpu,2g,node2
1cpu,2g,node3
finalshell连接三个服务器
完成在阿里云服务器上设置主机名映射、JDK、SSH免密等基础配置
目标:10–20 分钟在本机把 Hadoop 跑起来并成功执行一个任务。
java -version 验证)export HADOOP_HOME=~/app/hadoop-3.3.x
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk # 以你的路径为准
export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH
编辑 $HADOOP_HOME/etc/hadoop 下 4 个文件(示例精简版):
core-site.xml
<configuration>
<property>
<name>fs.defaultFS</name><value>hdfs://localhost:9000</value>
</property>
<property>
<name>hadoop.tmp.dir</name><value>/tmp/hadoop</value>
</property>
</configuration>
hdfs-site.xml
<configuration>
<property>
<name>dfs.replication</name><value>1</value>
</property>
<property>
<name>dfs.namenode.name.dir</name><value>file:///tmp/hadoop/nn</value>
</property>
<property>
<name>dfs.datanode.data.dir</name><value>file:///tmp/hadoop/dn</value>
</property>
</configuration>
yarn-site.xml
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle</value>
</property>
</configuration>
mapred-site.xml
<configuration>
<property>
<name>mapreduce.framework.name</name><value>yarn</value>
</property>
</configuration>
hdfs namenode -format
start-dfs.sh
start-yarn.sh
jps # 看到 NameNode / DataNode / ResourceManager / NodeManager 即正常
hdfs dfs -mkdir -p /tmp/input
echo "hello hadoop hello bigdata" > /tmp/local.txt
hdfs dfs -put /tmp/local.txt /tmp/input/
# 运行自带 MapReduce 示例(WordCount)
hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar wordcount \
/tmp/input /tmp/output
hdfs dfs -cat /tmp/output/part-r-00000 | head
HDFS
# 查看根目录下的文件
hdfs dfs -ls /
# 在HDFS上创建/data/ods
hdfs dfs -mkdir -p /data/ods
# 把本地文件系统的local.csv上传到HDFS的/data/ods/
hdfs dfs -put local.csv /data/ods/
# 从HDFS下载到当前本地目录
hdfs dfs -get /data/ods/local.csv
# 在HDFS端串流输出文件内容到stdout,在用本地head截取前10行查看
hdfs dfs -cat /data/ods/local.csv | head
# 显示/data目录下每个条目的大小
hdfs dfs -du -h /data
# 递归删除HDFS路径/temp/output,跳过回收站直接删除
hdfs dfs -rm -r -skipTrash /temp/output
echo 检查Hadoop环境
hdfs dfs -ls /
echo 在HDFS创建目录 /itcast/itheima
hdfs dfs -rm -r -skipTrach /itcast/itheima
hdfs dfs -mkdir -p /itcast/itheima
hdfs dfs -ls /itcast
echo 上传本机 /etc/hosts 到HDFS的 /itecast/itheima/
hdfs dfs -put -f /etc/hosts /itcast/itheima/
hdfs dfs -ls /itcast/itheima
echo 查看HDFS中刚上传的文件内容
hdfs dfs -cat /itcast/itheima/hosts | head -n 20
echo 把itheima加到文件最后一行
printf "itheima\n" | hdfs dfs -appendToFile - /itcast/itheima/hosts
echo 从HDFS下载该文件到本地指定目录
mkdir -p ~/itcast_lcoal
hdfs dfs -get -f /itcast/itheima/hosts ~/itcast_local/
ls -l ~/itcast_local/hosts
echo 在HDFS创建目录 /itcast/bigdata,并复制文件过去
hdfs dfs -mkdir -p /itcast/bigdata
hdfs dfs -cp /itcast/itheima/hosts /itcast/bigdata/
hdfs dfs -ls /itcast/bigdata
echo 把/itcast/itheima/hosts 重命名为 /itcast/itheima/myhost
hdfs dfs -mv /itcast/itheima/hosts /itcast/itheima/myhost
hdfs dfs -ls /itcast/itheima
echo 删除 /itcast 整个目录树
hdfs dfs -rm -r -skipTrach /itcast
hdfs dfs -ls /
NameNode管理DataNode
hflush/hsync/close)前其它读者不可见;close 后原子可见。echo 临时修改
hdfs dfs -setrep -R 2 /itcast/itheima
hdfs dfs -ls /itcast/itheima
hdfs fsck /itcast/itheima -files -blocks -racks
echo 检查某个文件的详细状态
hdfs fsck /itcast/itheima/hosts -files -blocks -locations
echo 检查某个HDFS的健康状态
hdfs fsck / -files -blocks -locations > fack_report.txt
echo 查看丢失或损坏的块
hdfs fsck / -list-corruptfileblocks
NameNode 常驻内存保存 命名空间与块索引 等元数据,典型包括:
直观理解:NameNode 记住“有哪些文件(目录树)、每个文件被切成哪些块、以及块副本目前在哪些 DN”。真正的数据块只在 DataNode 上。
NameNode 的持久化元数据由两类文件组成:
启动时流程:
多数集群采用 QJM(Quorum Journal Manager):主/备 NN 都把 edits 同步写入 JournalNode 集群;Standby 定期合并并做 热备 Checkpoint。
分布式计算的两种工作模式
| 分散汇总 | “大家各算各的,再汇总结果”——强调并行与自治,典型于大规模数据聚合与流计算。 |
| 中心调度步骤执行 | “由指挥中心统一调度每一步”——强调顺序控制与全局可见,典型于批处理或复杂依赖任务。 |

MapReduce一种分布式批处理编程模型与执行框架:把大数据计算拆成并行的 Map(映射) 和 Reduce(归并/聚合) 两阶段,中间由系统完成 Shuffle(按 key 分组、排序、拉取)。在 Hadoop 中,作业由 YARN 调度到各节点执行,具备容错与数据本地性。
ResourceManager管理NodeManager
YARN(Yet Another Resource Negotiator)是一个分布式资源管理系统,它是Hadoop生态系统中的重要组件之一。YARN的主要功能是管理和调度计算资源,确保在分布式计算环境中资源的高效利用。
ResourceManager (RM):
功能:ResourceManager 是 YARN 的中央资源调度组件,负责全局的资源管理和资源调度。它有两个主要的子组件:
NodeManager (NM):
功能:NodeManager 运行在每个工作节点(Node)上,负责管理本节点的资源使用情况。它的主要任务包括:
ApplicationMaster (AM):
Container:
HistoryServer:
ResourceManager 高可用性(HA):
NodeManager 高可用性(HA):
Zookeeper:
YARN Timeline Server:
YARN Application Client:
echo 启动YARN资源管理器和节点管理器
start-yarn.sh
echo 停止YARN资源管理器和节点管理器
stop-yarn.sh
echo 查看YARN集群的状态
yarn top
echo 查看ResourceManager状态
yarn resourceManager
echo 查看NodeManager状态
yarn node -list
echo 提交一个YARN应用
yarn jar <your-application-jar> <main-class> <args>
echo 查看YARN作业的状态
yarn application -status <applicationId>
echo 查看所以运行中的作业
yarn application -list
echo 杀掉正在运行的作业
yarn application -kill <applicationId>
echo 查看某个作业的日志信息
yarn logs -applicationId <applicationId>
echo 查看容器信息
yarn container -list
echo 查看节点的容器列表
yarn node -containers <node-id>
echo 查看所有节点的状态
yarn node -list
echo 查看某个节点的详细信息
yarn node -status <node-id>
echo 刷新NodeManager
yarn rmadmin -refreshNodes
echo 查看资源的总体使用情况
yarn top
echo 查看YARN队列的状态
yarn queue -status
echo 查看ResourceManager高可用性状态
yarn rmadmin -getServiceState <rm-id>
echo 切换ResourceManager主备节点
yarn rmadmin -transitionToAction <rm-id>
Apache Hive 是一个数据仓库工具,用于大规模数据存储和分析,基于 Hadoop 构建。它提供了一种类似 SQL 的查询语言(称为 HiveQL)来操作和查询存储在 Hadoop 分布式文件系统(HDFS)上的数据。Hive 最初由 Facebook 开发,用于简化 Hadoop 上大数据的处理与分析,后来成为 Apache 软件基金会的顶级项目。
Hive 提供了一种高层次的抽象,用户可以通过 SQL 语法查询和操作存储在 HDFS 中的数据,而不需要关心底层的 MapReduce 实现细节。

-- 查看所有数据库
show databases;
-- 查看当前数据库
select current_database();
-- 切换数据库
use mydb;
-- 查看所有表
show tables;
-- 查看表结构
describe employees;
-- 查看表数据
select * from employees limit 10;
-- 删除表
drop table employees;
-- 创建数据库
create database mydb;
-- 创建分区表
create table employees(id int,name string) paritioned by (department string);
-- 添加分区
alter table employees add partition(department='HR') location '/path/to/hdfs';
-- 加载数据到表
load data inpath '/path/to/data.csv' into table employees;
-- 执行查询
select name,age from employees where age > 30;
-- 创建视图
create view employees_above_30 as select name,age from employees where age > 30;
定义:内部表是由 Hive 完全控制的数据表。Hive 管理数据的生命周期,当删除内部表时,数据也会被删除。
特性:
user/hive/warehouse 目录)。CREATE TABLE 创建。create table employees (
id int,
name string,
age int,
department string
)
row format delimited
fields terminated by ','
stored as textfile;
定义:外部表指的是表的数据存储在 Hive 以外的地方,Hive 只是为其提供了结构和访问方式。数据存储的位置可以是 HDFS、HBase 或其他数据源。外部表的一个关键特性是删除表时,数据不会被删除。
特性:
create external table employees_external (
id int,
name string,
age int,
department string
)
row format delimited
fields terminated by ','
stored as textfile
location '/user/hive/warehouse/employees_data';
定义:分区表是一种根据某个列的值将数据分割存储的表。分区将数据按列的值(如日期、地区等)划分为多个子目录,通常用于提高查询性能。
特性:
create table employees_partitioned (
id int,
name string,
age int
)
partitioned by (department string)
row format delimited
fields terminated by ','
stored as textfile;
-- 添加数据到特定分区
alter table employees_partitioned add partition (department='HR')
location '/user/hive/warehouse/employees/HR';
定义:分桶表是通过对某个列(通常是主键或某些离散字段)进行哈希分桶的方式将数据存储在不同的文件中。每个桶(文件)存储的是该列值的某个子集,常用于提高查询效率,尤其是处理大规模数据时。
特性:
JOIN、GROUP BY 等,尤其是当查询涉及到分桶列时。create table employees_bucketed (
id int,
name string,
age int
)
clustered by (id) into 4 buckets
row format delimited
fields terminated by ','
stored as textfile;
使用load data语句将数据从本地文件系统获HDFS中加载到Hive表中。
1、从本地文件系统导入数据
load data local inpath '/local/path/to/datafile' into table your_table;
2、从HDFS导入数据
load data inpath '/hdfs/path/to/datafile' into table your_table;
3、从Hive表中选择数据并插入到另一个表
insert into table target_table select * from source_table;
1、导出到HDFS中的指定目录
insert overwrite directory '/hdfs/path/to/output_dir'
row format delimited
fields terminated by ','
select * from your_table;
2、使用hive -e命令将查询结果导出到文件系统
hive -e "select * from your_table" > /local/path/to/output_file
-- 修改表名
alter table old_table_name renamne to new_table_name;
alter table employees rename to employee_data;
-- 添加列
alter table table_name add columns(new_column_name data_type);
alter table employees add columns(salary double);
-- 删除列
alter table table_name replace columns(column_name data_type, ...);
alter table employees replace columns(id int, name string, department string);
-- 修改列的数据类型
alter table table_name change column old_column_name new_column_name new_data_type;
alter table employees change column age age bigint;
-- 修改表的存储格式
alter table table_name set fileformat new_file_format;
alter table employees set fileformat orc;
-- 修改表的存储位置
alter table table_name set location 'new_location';
alter table employees set location '/user/hive/warehouse/new_employees_data/'
-- 修改表的表属性
alter table table_name set tblproperties('property_name'='value');
alter table employees set tblproperties('compression'='snappy');
-- 添加分区
alter table table_name add partition (partition_spec) [location 'path'];
alter table employees_partitioned add partition (year-2021, month=12) location '/user/hive/warehhouse/employees/2021/12';
-- 删除分区
alter table table_name drop partition (partition_spec);
alter table employees_partitioned drop partition(year=2020, month=06);
-- 创建包含array的表
create table student_scores (
student_id int,
student_name string,
scores array<int>
);
-- 插入数据
insert into student_scores values
(1, 'jack', array(80,90,100)),
(2, 'alice', array(95,60,40)),
(3, 'bob', array(75,90,80));
-- 查询整个array
select student_id, student_name, scores
from student_scores;
-- 获取array中的特定元素:使用scores[0]获取数组的第一个元素
select student_id,student_name,scores[0] as first_score
from student_scores;
-- 使用size()函数获取数组的长度
select student_id, student_name, size(scores) as num_scores
from student_scores;
-- 获取array的子集:使用slice()函数可以获取数组的指定范围
select student_id, student_name, slice(scores, 1, 2) as first_two_scores
from student_scores;
-- 数组拼接:将两个array合并为一个array
select student_id, concat(scores, array(95, 98)) as all_scores
from student_scores;
-- 查找元素:检查数组是否包含某个特定的元素
select student_id, array_contains(scores, 90) as has_score_90
from student_scores;
-- 数组去重:返回数组中不重复的元素
select student_id, array_distinct(scores) as distinct_scores
from student_scores;
-- 创建包含map的表
create table student_scores (
student_id int,
student_name string,
scores map<string, int>
);
-- 插入数据
insert into student_scores values
(1, 'jack', map('math', 80, 'english', 90, 'history', 89)),
(2, 'alice', map('math', 80, 'english', 90, 'history', 89)),
(3, 'bob', map('math', 80, 'english', 90, 'history', 89));
-- 查询整个map
select student_id, student_name, scores
from student_scores;
-- 查询map中的特定值
select student_id, student_name, scores['math'] as math_score
from student_scores;
-- 使用size函数获取map中键值对的数量
select student_id, student_name, size(scores) as num_courses
from student_scores;
-- 使用map_keys()和map_values()获取所有键或值
select student_id, student_name, map_keys(scores) as course_names
from student_scores;
-- 检查map是否包含某个键
select student_id, student_name, map_contains_key(scores, 'math') as has_math_score
from student_scores;
-- 创建包含struct的表
create table student_info(
student_id int,
student_name string,
student_info struct<name:string, age:int, gender:string>
);
-- 插入数据
insert into student_info values
(1, 'John', struct('John', 20, 'Male')),
(2, 'alice', struct('alice smith', 22, 'Male')),
(3, 'bob', struct('bob jognson', 23, 'Male'));
-- 查询整个struct
select student_id, student_name, student_info
from student_info;
select student_id, student_name, student_info.name as student_name
student_info.age as student_age, student_info.gender as student_gender
from student_info;
-- 使用struct存储嵌套数据
create table student_courses(
student_id int,
student_name string,
courses array<struct<course_name:string, score:int, credits:int>>
);
-- 使用struct进行数据转换
select student_id, student_name
named_struct('course_name', 'math', 'score', 95, 'credits', 3) as math_course_info
from student_courses;
-- RLIKE 正则表达式匹配
select * from table_name where column_name rlike 'pattern';
select * from users where email rlike 'gmail';
-- TABLESAMPLE 数据抽样
select * from table_name tablesample(bernoulli|system) percent n;
-- 抽10%的数据
select * from sales tablesample(bernoulli) percent 10;
-- 虚拟列。total_price就是
select order_id, quantity, unit_price, (quantity * unit_price) as total_price from orders;
-- 将所有名字转化成大写并且连接一个字符串
select concat(upper(name), '- employee') from employees;
-- 将日期格式化为YYYY-MM-DD形式
select date_format(event_date, 'yyyy-MM-dd') from events;
-- 条件判断
select name,
case
when score >= 60 then 'Pass'
else 'Fail'
end as result
from students;
-- 条件
select if(score >= 60, 'Pass', 'Fail') from students;
-- 返回第一个非空值
select coalesce(nickname, 'Anonymous') from users;
-- 如果两个值相等,则返回NULL,否则返回第一个值
select nullif(price, 0) from products;
聊天平台每天都会有大量的用户在线,会出现大量的聊天数据,通过对聊天数据的统计分析,可以更好的对用户构建精准的用户画像,为用户提供更好的服务以及实现高ROI的平台运营推广,给公司的发展决策提供精确的数据支撑我们将基于一个社交平台App的用户数据,完成相关指标的统计分析并结合BI工具对指标进行可视化展现。
需求
统计今日总消息量
create table db_msg.tb_rs_total_msg_cnt comment '每日消息总量' as
select msg_day, count(*) as total_msg_cnt from db_msg.tb_msg_etl group by msg_day;
统计今日每小时消息量、发送和接收用户数
create table db_msg.tb_rs_hour_msg_cnt comment '每小时消息量趋势' as
select
msg_hour,
count(*) as total_msg_cnt,
count(distinct sender_account) as sender_user_cnt,
count(distinct receiver_account) as receiver_user_cnt
from db_msg.tb_msg_etl
group by msg_hour;
统计今日各地区发送消息数据量
create table db_msg.tb_rs_loc_cnt comment '每日各地区发送消息总量' as
select
msg_day, sender_lng, sender_lat, count(*) as total_msg_cnt
from db_msg.tb_msg_etl
group by msg_day, sender_lng, sender_lat;
统计今日发送消息和接收消息的用户数
create table db_msg.tb_rs_user_cnt comment '每日发送和接收消息的人数' as
select
msg_day,
count(distinct sender_account) as sender_user_cnt,
count(distinct receiver_account) as receiver_user_cnt
from db_msg.tb_msg_etl
group by msg_day;
统计今日发送消息最多的Top10用户
create table db_msg.tb_rs_s_user_top10 comment '发送消息最多的10歌用户' as
select
sender_name,
count(*) as sender_msg_cnt
from db_msg.tb_msg_etl group by sender_name
order by sender_msg_cnt desc
limit 10;
统计今日接收消息最多的Top10用户
create table db_msg.tb_rs_r_user_top10 comment '接收消息最多的10个用户' as
select
receiver_name,
count(*) as receiver_msg_cnt
from db_msg.tb_msg_etl group by receiver_name
order by receiver_msg_cnt desc
limit 10;
统计发送人的手机型号分布情况
create table db_msg.tb_rs_sender_phone comment '发送人的手机型号分布' as
select
sender_phonetype,
count(*) as cnt
from db_msg.tb_msg_etl group by sender_phonetype;
统计发送人的设备操作系统分布情况
create table db_msg.tb_rs_sender_os comment '发送人的os分布' as
select
sender_os,
count(*) as cnt
from db_msg.tb_msg_etl group by sender_os;
-- 建立数据库,建立表格
drop database if exists db_msg cascade;
create database db_msg;
use db_msg;
show databases;
--如果表已存在就删除
drop table if exists db msg.tb_msg_source:
--建表
create table db msg.tb msg source(
msg_time string comment“消息发送时间",
sender name string comment"发送人昵称",
sender_account string comment“发送人账号",
sender_sex string comment"发送人性别",
sender_ip string comment "发送人ip地址",
sender_os string comment "发送人操作系统",
sender_phonetype string comment"发送人手机型号",
sender_network string comment"发送人网络类型",
sender_gps string comment"发送人的GPS定位",
receiver_name string comment"接收人昵称",
receiver_ip string comment"接收人IP",
receiver_account string comment“接收人账号",
receiver_os string comment"接收人操作系统",
receiver_phonetype string comment"接收人手机型号",
receiver_network string comment“接收人网络类型",
receiver_gps string comment"接收人的GPS定位",
receiver_sex string comment"接收人性别",
msg_type string comment"消息类型",
distance string comment"双方距离",
message string comment“消息内容
);
-- 问题
-- Some data fields are empty, which means the data is not valid.
select
msg_time,
sender_name,
sender_gps
from db_msg.tb_msg_scource
where length(sender_gps) = 0
limit 10;
-- We need to count the number of messages per day and per hour, but the data only has an overall time field and no day or hour fields, which makes it difficult to process.
select
msg_time
from db_msg.tb_msg_source
limit 10;
-- We need to build a visual map of the region using longitude and latitude, but the GPS longitude and latitude data is only one field.
select
msg_time,
sender_name,
sender_gps
from db_msg.tb_msg_scource
where length(sender_gps) = 0
limit 10;
-- 需要统计每天、每个小时的消息量,但是数据没有天和小时字段,只有整体时间字段,不好处理
select
msg_time
from db_msg.tb_msg_source
limit 10;
-- 需要对经度和纬度构建地区的可视化地图,但是数据中 GPS 经纬度为一个字段
select
msg_time,
sender_name,
sender_gps
from db_msg.tb_msg_scource
where length(sender_gps) = 0
limit 10;
-- We need to count the number of messages per day and per hour, but the data only has an overall time field and no day or hour fields, which makes it difficult to process.
select
msg_time
from db_msg.tb_msg_source
limit 10;
-- We need to build a visual map of the region using longitude and latitude, but the GPS longitude and latitude data is only one field.
select
msg_time,
sender_name,
sender_gps
from db_msg.tb_msg_scource
where length(sender_gps) = 0
limit 10;
-- 需要统计每天、每个小时的消息量,但是数据没有天和小时字段,只有整体时间字段,不好处理
select
msg_time
from db_msg.tb_msg_source
limit 10;
-- 需要对经度和纬度构建地区的可视化地图,但是数据中GPS经纬度为一个字段
select
sender_gps
from db_msg.tb_msg_source
limit 10;
-- 解决方案
-- 对字段为空的不合法数据进行过滤
-- 通过时间字段构建天和小时字段
-- 从GPS的经纬度中提取经度和纬度
-- 将ETL以后的结果保存到一张新的Hive表中
insert overwrite table db_msg.tb_msg_etl
select
*,
date(msg_time) as msg_day,
hour(msg_time) as msg_hour,
split(sender_gps, ',')[0] as sender_lng,
split(sender_gps, ',')[1] as sender_lat
from db_msg.tb_msg_source
where length(sender_gps) > 0;