基于Mahout实现K-Means聚类

需求分析

需要对数据集进行预处理,选择合适的特征进行聚类分析,确定聚类的数量和初始中心点,调用Mahout提供的K-Means算法进行聚类计算,评估聚类结果的准确性和稳定性。同时,需要对Mahout的使用和参数调优进行深入学习和实践,以保证聚类结果的有效性和可靠性。

系统实现

    1.对实验整体的理解:

    本次实验,我们的目的是理解聚类的原理,并且掌握常见聚类的算法,以及掌握使用Mahout实现K-Means聚类分析算法的过程。

     2.实验整体流程分析:

  • 创建项目,导入开发依赖包
  • 编写工具类
  • 编写聚类分析的代码
  • 将聚类结果输出
  • 评估聚类的效果

     3.准备工作:

  • 使用IDEA创建一个Maven项目:mahout_kmeans_demo

 

  • 修改pom.xml文件,导入开发MapReduce所需的Jar包
 <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-common</artifactId>
            <version>2.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.mahout</groupId>
            <artifactId>mahout-mr</artifactId>
            <version>0.13.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.mahout</groupId>
            <artifactId>mahout-math</artifactId>
            <version>0.13.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.mahout</groupId>
            <artifactId>mahout-hdfs</artifactId>
            <version>0.13.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.mahout</groupId>
            <artifactId>mahout-integration</artifactId>
            <version>0.13.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.mahout</groupId>
            <artifactId>mahout-examples</artifactId>
            <version>0.13.0</version>
        </dependency>
</dependencies>

下载相关依赖包

等待pom.xml文件不再出现错误即可 

  • 准备实验数据并下载

  • 启动Hadoop集群。

终端输入start-all.sh

可以使用jps命令查看集群启动情况。

     4.执行聚类过程:

  • 编写工具类HdfsUtil,对HDFS的基本操作进行封装
package com;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.JobConf;

import java.io.IOException;
import java.net.URI;

public class HdfsUtil {
    private static final String HDFS = "hdfs://master:9000/";
    private String hdfsPath;
    private Configuration conf;

    public HdfsUtil(Configuration conf) {
        this(HDFS, conf);
    }

    public HdfsUtil(String hdfs, Configuration conf) {
        this.hdfsPath = hdfs;
        this.conf = conf;
    }

    public static JobConf config() {
        JobConf conf = new JobConf(HdfsUtil.class);
        conf.setJobName("HdfsDAO");
        return conf;
    }

    public void mkdirs(String folder) throws IOException {
        Path path = new Path(folder);
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        if (!fs.exists(path)) {
            fs.mkdirs(path);
            System.out.println("Create: " + folder);
        }
        fs.close();
    }

    public void rmr(String folder) throws IOException {
        Path path = new Path(folder);
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        fs.deleteOnExit(path);
        System.out.println("Delete: " + folder);
        fs.close();
    }

    public void ls(String folder) throws IOException {
        Path path = new Path(folder);
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        FileStatus[] list = fs.listStatus(path);
        System.out.println("ls: " + folder);
        System.out.println("==========================================================");
        for (FileStatus f : list) {
            System.out.printf("name: %s, folder: %s, size: %d\n", f.getPath(), f.isDir(), f.getLen());
        }
        System.out.println("==========================================================");
        fs.close();
    }

    public void createFile(String file, String content) throws IOException {
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        byte[] buff = content.getBytes();
        FSDataOutputStream os = null;
        try {
            os = fs.create(new Path(file));
            os.write(buff, 0, buff.length);
            System.out.println("Create: " + file);
        } finally {
            if (os != null)
                os.close();
        }
        fs.close();
    }

    public void copyFile(String local, String remote) throws IOException {
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        fs.copyFromLocalFile(new Path(local), new Path(remote));
        System.out.println("copy from: " + local + " to " + remote);
        fs.close();
    }

    public void download(String remote, String local) throws IOException {
        Path path = new Path(remote);
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        fs.copyToLocalFile(path, new Path(local));
        System.out.println("download: from" + remote + " to " + local);
        fs.close();
    }

    public void cat(String remoteFile) throws IOException {
        Path path = new Path(remoteFile);
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        FSDataInputStream fsdis = null;
        System.out.println("cat: " + remoteFile);
        try {
            fsdis = fs.open(path);
            IOUtils.copyBytes(fsdis, System.out, 4096, false);
        } finally {
            IOUtils.closeStream(fsdis);
            fs.close();
        }
    }
}
  • 编写KMeansMahout类,执行聚类过程
package com;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.mahout.clustering.Cluster;
import org.apache.mahout.clustering.canopy.CanopyDriver;
import org.apache.mahout.clustering.conversion.InputDriver;
import org.apache.mahout.clustering.kmeans.KMeansDriver;
import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
import org.apache.mahout.utils.clustering.ClusterDumper;

public class KMeansMahout {
    private static final String HDFS = "hdfs://master:9000";

    public static void main(String[] args) throws Exception {
        String localFile = "/home/data/iris.dat";
        //  mahout输出至HDFS的目录
        String outputPath = HDFS + "/user/hdfs/kmeans/output";
        //  mahout的输入目录
        String inputPath = HDFS + "/user/hdfs/kmeans/input/";

        //  canopy算法的t1和t2
        double t1 = 2;
        double t2 = 1;
        //  收敛阀值
        double convergenceDelta = 0.5;
        //  最大迭代次数
        int maxIterations = 10;

        Path output = new Path(outputPath);
        Path input = new Path(inputPath);
        Configuration conf = new Configuration();

        HdfsUtil hdfs = new HdfsUtil(HDFS, conf);
        hdfs.rmr(inputPath);
        hdfs.mkdirs(inputPath);
        hdfs.copyFile(localFile, inputPath);
        hdfs.ls(inputPath);

        //  每次执行聚类前,删除掉上一次的输出目录
        HadoopUtil.delete(conf, output);
        //  执行聚类
        run(conf, input, output, new EuclideanDistanceMeasure(), t1, t2, convergenceDelta, maxIterations);
    }

    private static void run(Configuration conf, Path input, Path output,
                            EuclideanDistanceMeasure euclideanDistanceMeasure, double t1, double t2,
                            double convergenceDelta, int maxIterations) throws Exception {

        Path directoryContainingConvertedInput = new Path(output, "data");

        System.out.println("Preparing  Input");
        //  将输入文件序列化,并选取RandomAccessSparseVector作为保存向量的数据结构
        InputDriver.runJob(input, directoryContainingConvertedInput,
                "org.apache.mahout.math.RandomAccessSparseVector");

        System.out.println("Running  Canopy  to  get  initial  clusters");
        //  保存canopy的目录
        Path canopyOutput = new Path(output, "canopies");

        //  执行Canopy聚类
        CanopyDriver.run(conf, directoryContainingConvertedInput, canopyOutput,
                euclideanDistanceMeasure, t1, t2, false, 0.0, false);

        System.out.println("Running  KMeans");
        //  执行k-means聚类,并使用canopy目录
        KMeansDriver.run(conf, directoryContainingConvertedInput,
                new Path(canopyOutput, Cluster.INITIAL_CLUSTERS_DIR + "-final"),
                output, convergenceDelta, maxIterations, true, 0.0, false);

        System.out.println("run  clusterdumper");
        //  将聚类的结果输出至HDFS
        ClusterDumper clusterDumper = new ClusterDumper(new Path(output, "clusters-*-final"),
                new Path(output, "clusteredPoints"));
        clusterDumper.printClusters(null);
    }
}

在KmeansMahout类上点击右键并执行程序

 执行结果在HDFS目录中

     5.解析聚类结果:

  • 从Mahout的输出目录下提取出所要的信息

  • 编写ClusterOutput类,解析聚类后结果
package com;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.mahout.clustering.classify.WeightedPropertyVectorWritable;
import org.apache.mahout.math.Vector;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;

public class ClusterOutput {
    private static final String HDFS = "hdfs://master:9000";

    public static void main(String[] args) {
        try {
            //   需要被解析的mahout的输出文件
            String clusterOutputPath = "/user/hdfs/kmeans/output";
            //   解析后的聚类结果,将输出至本地磁盘
            String resultPath = "/home/data/result.txt";

            BufferedWriter bw;
            Configuration conf = new Configuration();
            conf.set("fs.default.name", HDFS);
            FileSystem fs = FileSystem.get(conf);

            SequenceFile.Reader reader = null;
            reader = new SequenceFile.Reader(fs, new Path(clusterOutputPath + "/clusteredPoints/part-m-00000"), conf);
            bw = new BufferedWriter(new FileWriter(new File(resultPath)));

            //   key为聚簇中心ID
            IntWritable key = new IntWritable();
            WeightedPropertyVectorWritable value = new WeightedPropertyVectorWritable();

            while (reader.next(key, value)) {
                //   得到向量
                Vector vector = value.getVector();
                String vectorValue = "";
                //   将向量各个维度拼接成一行,用\t分隔
                for (int i = 0; i < vector.size(); i++) {
                    if (i == vector.size() - 1) {
                        vectorValue += vector.get(i);
                    } else {
                        vectorValue += vector.get(i) + "\t";
                    }
                }
                bw.write(key.toString() + "\t" + vectorValue + "\n\n");
            }

            bw.flush();
            reader.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

在ClusterOutput类上右键执行程序

 执行结果被保存在/home/data/result.txt文件中,打开终端执行以下命令

     6.评估聚类效果:

  • 编写InterClusterDistances类,计算平均簇间距离
package com;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.mahout.clustering.Cluster;
import org.apache.mahout.clustering.iterator.ClusterWritable;
import org.apache.mahout.common.distance.DistanceMeasure;
import org.apache.mahout.common.distance.EuclideanDistanceMeasure;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class InterClusterDistances {
    private static final String HDFS = "hdfs://master:9000";

    public static void main(String[] args) throws Exception {
        String inputFile = HDFS + "/user/hdfs/kmeans/output";
        System.out.println("聚类结果文件地址:" + inputFile);

        Configuration conf = new Configuration();
        Path path = new Path(inputFile + "/clusters-2-final/part-r-00000");
        System.out.println("Input Path:" + path);

        FileSystem fs = FileSystem.get(path.toUri(), conf);
        List<Cluster> clusters = new ArrayList<Cluster>();

        SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
        Writable key = (Writable) reader.getKeyClass().newInstance();
        ClusterWritable value = (ClusterWritable) reader.getValueClass().newInstance();

        while (reader.next(key, value)) {
            Cluster cluster = value.getValue();
            clusters.add(cluster);
            value = (ClusterWritable) reader.getValueClass().newInstance();
        }

        System.out.println("Cluster In Total:" + clusters.size());

        DistanceMeasure measure = new EuclideanDistanceMeasure();
        double max = 0;
        double min = Double.MAX_VALUE;
        double sum = 0;
        int count = 0;
        Set<Double> total = new HashSet<Double>();

        // 如果聚类的个数大于1才开始计算
        if (clusters.size() != 1 && clusters.size() != 0) {
            for (int i = 0; i < clusters.size(); i++) {
                for (int j = 0; j < clusters.size(); j++) {
                    double d = measure.distance(clusters.get(i).getCenter(), clusters.get(j).getCenter());
                    min = Math.min(d, min);
                    max = Math.max(d, max);
                    total.add(d);
                    sum += d;
                    count++;
                }
            }

            System.out.println("Maximum Intercluster Distance:" + max);
            System.out.println("Minimum Intercluster Distance:" + min);
            System.out.println("Average Intercluster Distance:" + sum / count);
            for (double d : total) {
                System.out.print("[" + d + "] ");
            }

        } else if (clusters.size() == 1) {
            System.out.println("只有一个类,无法判断聚类质量");
        } else if (clusters.size() == 0) {
            System.out.println("聚类失败");
        }
    }
}

同样右键执行程序,得到下图结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/427357.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

解析社交媒体二维码生成:连接世界,拓展社交网络

随着社交媒体的普及和二维码技术的发展&#xff0c;社交媒体二维码作为一种新型的社交工具&#xff0c;逐渐受到人们的关注和喜爱。本文将探讨社交媒体二维码的定义、应用场景以及优势所在。 1. 什么是社交媒体二维码? 社交媒体二维码是将个人或企业在社交媒体平台上的信息整…

JavaSE-09(Java IO精华总结)

Java IO 简单做个总结&#xff1a; 1 .InputStream/OutputStream 字节流的抽象类。2 .Reader/Writer 字符流的抽象类。3 .FileInputStream/FileOutputStream 节点流&#xff1a;以字节为单位直接操作“文件”。4 .ByteArrayInputStream/ByteArrayOutputStream 节点流&#xff…

现在如何才能开通微信公众号留言功能?

为什么公众号没有留言功能&#xff1f;2018年2月12日之后直到现在&#xff0c;新注册公众号的运营者会发现一个问题&#xff1a;无论是个人还是企业的公众号&#xff0c;在后台都找不到留言功能了。这对公众号来说绝对是一个极差的体验&#xff0c;少了一个这么重要的功能&…

4. 编写app组件

1. 代码 main.ts // 引入createApp用于创建应用 import {createApp} from "vue"// 引入App根组件 import App from ./App.vue createApp(App).mount(#app) App.vue <!-- vue文件可以写三种标签1. template标签&#xff0c;写html结构2. script 脚本标签&…

Python实现向量自回归移动平均模型(VARMA算法)项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档视频讲解&#xff09;&#xff0c;如需数据代码文档视频讲解可以直接到文章最后获取。 1.项目背景 向量自回归移动平均模型&#xff08;Vector Autoregressive Moving Average, VARMA&#xff09;是一种…

GEE:使用ReLu激活函数对单波段图像进行变换(以NDVI为例)

作者:CSDN @ _养乐多_ 本文将介绍在 Google Earth Engine (GEE)平台上,对任意单波段影像进行 ReLu 变换的代码。并以对 NDVI 影像像素值的变换为例。 文章目录 一、ReLu激活函数1.1 什么是 ReLu 激活函数1.2 用到遥感图像上有什么用?二、代码链接三、完整代码一、ReLu激活…

电脑端微信无法打开公众号/小程序?

生气了。怎么动不动电脑端微信就打不开公众号/小程序&#xff0c;每次都要搞好久 第一步&#xff1a;打开控制面板、网络和Internet&#xff0c;选中Internet选项。 第二步&#xff1a;选中连接选项&#xff0c;并打开下方的局域网设置。 第三步&#xff1a;取消 为LAN使用代理…

横幅条(自定义控件)的编写

常可以看见ViewPager翻页视图下有几个圆点&#xff0c;并随着图片变化而变化。 我们称之为横幅条&#xff0c;横幅条自定义控件的编写有两种&#xff1a;(1)使用Paint与Canvas绘制&#xff1b;(2)使用RadioButton组合。 第一种编写方法的优势是可以显示滑动过程中的位置&#…

大模型之SORA技术学习

文章目录 sora的技术原理文字生成视频过程sora的技术优势量大质优的视频预训练库算力多&#xff0c;采样步骤多&#xff0c;更精细。GPT解释力更强&#xff0c;提示词(Prompt&#xff09;表现更好 使用场景参考 Sora改变AI认知方式&#xff0c;开启走向【世界模拟器】的史诗级的…

Arduino应用开发——使用GUI-Guider制作LVGL UI并导入ESP32运行

Arduino应用开发——使用GUI-Guider制作LVGL UI并导入ESP32运行 目录 Arduino应用开发——使用GUI-Guider制作LVGL UI并导入ESP32运行前言1 使用GUI-Guider设计UI1.1 创建工程1.2 设计UI 2 ESP工程导入UI2.1 移植LVGL2.2 移植UI文件2.3 调用UI文件2.4 烧录测试 结束语 前言 GU…

(UE4升级UE5)Selected Level Actor节点升级到UE5

本问所用工具为&#xff1a; UE5 UE4 插件AssetDeveTool包含&#xff1a;快速选择功能自动化批量LOD功能自动化批量展UV功能自动化批量减面功能自动化批量修改查找替换材质功能批量重命名工具碰撞器修改工具资源整理工具支持4.26 - 5.3版本https://mbd.pub/o/bread/mbd-ZZubkp…

Manomotion 实现AR手势互动-解决手势无效的问题

之前就玩过 Manomotion &#xff0c;现在有新需求&#xff0c;重新接入发现不能用了&#xff0c;不管什么办法&#xff0c;都识别不了手势&#xff0c;我记得当初是直接调用就可以的。 经过研究发现&#xff0c;新版本SDK改了写法。下边就写一下新版本的调用&#xff0c;并且实…

Windows如何安装docker-desktop

下载 docker-desktop设置环境安装wsl可能遇到的错误 下载 docker-desktop 下载官网&#xff1a;https://www.docker.com/products/docker-desktop/ 设置环境 如果没有Hyper-V选项的,按照以下步骤 添加一个文件Hyper-V.bat 添加以下内容,并双击运行后重启电脑 pushd "%~…

Android sutdio 4.1.2版本Gradle 构建和打包慢解决方法,亲测有效

1在设置里面的Gradle 找到这个目录 进入后 新建文件&#xff0c; gradle.properties 输入设置 并保存 org.gradle.daemontrue 项目第一次加载构建过程比较慢&#xff0c;需要等&#xff0c;完成后&#xff0c;修改下面的配置 gradle-3.3-all.zip 这个文件可以先提前下载好&am…

AI日报:埃隆·马斯克起诉OpenAI

埃隆马斯克&#xff08;ElonMusk&#xff09;正在起诉OpenAI涉嫌违约&#xff0c;声称这位ChatGPT的创建者违反了其成为非营利组织的创始承诺&#xff0c;这位科技亿万富翁表示&#xff0c;他资助并培育了这一承诺。 在一份长达46页的爆炸性投诉中&#xff0c;马斯克将OpenAI首…

分布式ID生成算法|雪花算法 Snowflake | Go实现

写在前面 在分布式领域中&#xff0c;不可避免的需要生成一个全局唯一ID。而在近几年的发展中有许多分布式ID生成算法&#xff0c;比较经典的就是 Twitter 的雪花算法(Snowflake Algorithm)。当然国内也有美团的基于snowflake改进的Leaf算法。那么今天我们就来介绍一下雪花算法…

图书管理系统的设计与实现

** &#x1f345;点赞收藏关注 → 私信领取本源代码、数据库&#x1f345; 本人在Java毕业设计领域有多年的经验&#xff0c;陆续会更新更多优质的Java实战项目希望你能有所收获&#xff0c;少走一些弯路。&#x1f345;关注我不迷路&#x1f345;** 一 、设计说明 1.1 课题…

数据结构之树结构(下)

各种各样的大树 平衡二叉树 (AVL树) 普通二叉树存在的问题 左子树全部为空&#xff0c;从形式上看&#xff0c;更像一个单链表 插入速度没有影响 查询速度明显降低&#xff08;因为需要依次比较&#xff09;&#xff0c;不能发挥BST的优势&#xff0c;因为每次还需要比较左子…

Unity 脚本-生命周期常用函数

在Unity中&#xff0c;万物皆是由组件构成的。 右键创建C&#xff03;脚本&#xff0c;拖动脚本到某物体的组件列表。 生命周期相关函数 using System.Collections; using System.Collections.Generic; using UnityEngine;// 必须要继承 MonoBehaviour 才是一个组件 // 类名…

AirPods Pro 2 耳机推送新固件,苹果Find My功能助力产品成长

苹果公司面向 AirPods Pro 2&#xff08;包括 USB-C 和 Lightning 版本&#xff09;&#xff0c;推出了全新的测试版固件更新&#xff0c;版本号为 6E188&#xff0c;高于 12 月份发布的 6B34 固件。 苹果和往常一样&#xff0c;并没有提供详细的更新日志或者说明&#xff0c…