Usage of Canopy Clustering(Apache Mahout) —In Java

Maven Dependency:

<!-- https://mvnrepository.com/artifact/org.apache.mahout/mahout-mr -->
<dependency>
    <groupId>org.apache.mahout</groupId>
    <artifactId>mahout-mr</artifactId>
    <version>0.13.0</version>
</dependency>

<dependency>
    <groupId>org.apache.mahout</groupId>
    <artifactId>mahout-examples</artifactId>
    <version>0.13.0</version>
</dependency>

Code:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.mahout.clustering.canopy.CanopyDriver;
import org.apache.mahout.clustering.conversion.InputDriver;
import org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver;
import org.apache.mahout.clustering.kmeans.KMeansDriver;
import org.apache.mahout.clustering.kmeans.RandomSeedGenerator;
import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.distance.*;
import org.apache.mahout.utils.clustering.ClusterDumper;

import java.io.IOException;

public class Mahout {

    /**
     * Clustering by KMeans, need to specify k
     * <p>
     * param:
     * 1.Path Input: The path of all data points to be clustered
     * 2.Path output: Storage path of clustering results
     * 3.DistanceMeasure measure: default SquaredEuclidean
     * 4.Double convergenceDelta: convergence delta, the distance between the new cluster center and the last cluster center should not exceed convergence delta ,if exceeds, continue, otherwise stop the iteration. default value is 0.001D
     * 5.int maxIterations: if iterations less than maxIterations ,continue,otherwise stop,if max iterations or convergence delta satisfy one,stopiterations
     * 6.boolean runClustering: If true, after computing the cluster center, calculate which cluster each data point belongs to, or end the cluster center,default true
     * 7.boolean clusteringOption: Calculate by single machine or Map Reduce method,default false: Map Reduce
     * 8.int k :cluster number
     */
    public void kmeansRun() {
        try {
            Path output = new Path("runtime-node/processor-node/src/test/java/com/quark/datastream/runtime/task/ml_svm/LibSvmTest/output");
            Path input = new Path("runtime-node/processor-node/src/test/java/com/quark/datastream/runtime/task/ml_svm/LibSvmTest/testdata");
            Configuration conf = new Configuration();
            HadoopUtil.delete(conf, new Path[]{output});
            int k = 3;
            Double convergenceDelta = 0.5D;
            DistanceMeasure measure = new EuclideanDistanceMeasure();
            int maxIterations = 10;

            Path directoryContainingConvertedInput = new Path(output, "data");
            System.out.println("Preparing Input");
            InputDriver.runJob(input, directoryContainingConvertedInput, "org.apache.mahout.math.RandomAccessSparseVector");
            System.out.println("Running random seed to get initial clusters");
            Path clusters = new Path(output, "random-seeds");
            clusters = RandomSeedGenerator.buildRandom(conf, directoryContainingConvertedInput, clusters, k, measure);
            System.out.println("Running KMeans with k = " + k);
            KMeansDriver.run(conf, directoryContainingConvertedInput, clusters, output, convergenceDelta, maxIterations, false, 0.0D, false);
            Path outGlob = new Path(output, "clusters-*-final");
            Path clusteredPoints = new Path(output, "clusteredPoints");
            System.out.println("Dumping out clusters from clusters: " + outGlob + " and clusteredPoints:" + clusteredPoints);
            ClusterDumper clusterDumper = new ClusterDumper(outGlob, clusteredPoints);
            String[] args = {
                    "-i", "output/clusters-*-final",
                    "-o", "points",
//                    "-p", "output/clusteredPoints",
//                    "-of", "TEXT"
            };
            clusterDumper.run(args);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * Only use canopy, calculate the initial K value and center point coordinates, no need to clustering
     * <p>
     * param:
     * 1.Path Input: The path of all data points to be clustered
     * 2.Path output: Storage path of clustering results
     * 3.DistanceMeasure measure: default SquaredEuclidean
     * 4.Double t1: Weak attribution distance, when the distance is less than T1, the current point is marked as weak attribution point, and adds this point to the corresponding cluster . t1>t2
     * 5.Double t2: Strong attribution distance, when the distance is less than T2, the marker's current point is a strong attribution point. If the current point is not a strong attribution point, create a new cluster at the center of the current point. For all cluster centers, compute new clusters
     * 6.boolean runClustering: If true, after computing the cluster center, calculate which cluster each data point belongs to, or end the cluster center,default true
     * 7.boolean clusteringOption: Calculate by single machine or Map Reduce method,default false: Map Reduce
     *
     * @throws Exception
     */
    public void canopyRun() throws Exception {
        Path output = new Path("output");
        Path input = new Path("testdata");
        Configuration conf = new Configuration();
        HadoopUtil.delete(conf, new Path[]{output});
        DistanceMeasure measure = new EuclideanDistanceMeasure();
        Double t1 = 2.0D;
        Double t2 = 1.0D;

        Path directoryContainingConvertedInput = new Path(output, "data");
        System.out.println("Preparing Input");
        InputDriver.runJob(input, directoryContainingConvertedInput, "org.apache.mahout.math.RandomAccessSparseVector");
        System.out.println("Running Canopy to get initial clusters");
        CanopyDriver.run(conf, directoryContainingConvertedInput, output, measure, t1, t2, false, 0.0D, false);
        Path outGlob = new Path(output, "clusters-*-final");
        Path clusteredPoints = new Path(output, "clusteredPoints");
        ClusterDumper clusterDumper = new ClusterDumper(outGlob, clusteredPoints);
        String[] args = {
                "-i", "output/clusters-*-final",
                "-o", "points",
                "-p", "output/clusteredPoints",
                "-of", "TEXT"
        };
        clusterDumper.run(args);
    }

    /**
     * Clustering data points by FuzzyKMeans
     * <p>
     * param:
     * 1.Path Input: The path of all data points to be clustered
     * 2.Path output: Storage path of clustering results
     * 3.DistanceMeasure measure: default SquaredEuclidean
     * 4.Double t1: Weak attribution distance, when the distance is less than T1, the current point is marked as weak attribution point, and adds this point to the corresponding cluster . t1>t2
     * 5.Double t2: Strong attribution distance, when the distance is less than T2, the marker's current point is a strong attribution point. If the current point is not a strong attribution point, create a new cluster at the center of the current point. For all cluster centers, compute new clusters
     * 6.fuzziness: Fuzzy parameters m,value(1,2],the larger the m, the greater the ambiguity.
     * 7.Double convergenceDelta: convergence delta, the distance between the new cluster center and the last cluster center should not exceed convergence delta ,if exceeds, continue, otherwise stop the iteration. default value is 0.001D
     * 8.int maxIterations: if iterations less than maxIterations ,continue,otherwise stop,if max iterations or convergence delta satisfy one,stopiterations
     * 9.boolean runClustering: If true, after computing the cluster center, calculate which cluster each data point belongs to, or end the cluster center,default true
     * 10.boolean clusteringOption: Calculate by single machine or Map Reduce method,default false: Map Reduce
     *
     * @throws Exception
     */
    public void fuzzyKMeansRun() throws Exception {

        Path output = new Path("output");
        Path input = new Path("testdata");
        Configuration conf = new Configuration();
        HadoopUtil.delete(conf, new Path[]{output});
        Double convergenceDelta = 0.5D;
        DistanceMeasure measure = new SquaredEuclideanDistanceMeasure();
        int maxIterations = 10;
        float fuzziness = 2.0F;
        int k = 3;

        Path directoryContainingConvertedInput = new Path(output, "data");
        System.out.println("Preparing Input");
        InputDriver.runJob(input, directoryContainingConvertedInput, "org.apache.mahout.math.RandomAccessSparseVector");
        Path clusters = new Path(output, "random-seeds");
        clusters = RandomSeedGenerator.buildRandom(conf, directoryContainingConvertedInput, clusters, k, measure);
        FuzzyKMeansDriver.run(directoryContainingConvertedInput, clusters, output, convergenceDelta, maxIterations, fuzziness, true, true, 0.0D, false);
        Path outGlob = new Path(output, "clusters-*-final");
        Path clusteredPoints = new Path(output, "clusteredPoints");
        System.out.println("Dumping out clusters from clusters: " + outGlob + " and clusteredPoints:" + clusteredPoints);
        ClusterDumper clusterDumper = new ClusterDumper(outGlob, clusteredPoints);
        String[] args = {
                "-i", "output/clusters-*-final",
                "-o", "points",
                "-p", "output/clusteredPoints",
                "-of", "JSON"
        };
        clusterDumper.run(args);
    }

    /**
     * Use canopy to get k, then use KMeans to clustering
     * <p>
     * param:
     * 1.Path Input: The path of all data points to be clustered
     * 2.Path output: Storage path of clustering results
     * 3.DistanceMeasure measure: default SquaredEuclidean
     * 4.Double t1: Weak attribution distance, when the distance is less than T1, the current point is marked as weak attribution point, and adds this point to the corresponding cluster . t1>t2
     * 5.Double t2: Strong attribution distance, when the distance is less than T2, the marker's current point is a strong attribution point. If the current point is not a strong attribution point, create a new cluster at the center of the current point. For all cluster centers, compute new clusters
     * 6.Double convergenceDelta: convergence delta, the distance between the new cluster center and the last cluster center should not exceed convergence delta ,if exceeds, continue, otherwise stop the iteration. default value is 0.001D
     * 7.int maxIterations: if iterations less than maxIterations ,continue,otherwise stop,if max iterations or convergence delta satisfy one,stopiterations
     * 8.boolean runClustering: If true, after computing the cluster center, calculate which cluster each data point belongs to, or end the cluster center,default true
     * 9.boolean clusteringOption: Calculate by single machine or Map Reduce method,default false: Map Reduce
     *
     * @throws Exception
     */
    public void canopyKMeansRun() throws Exception {

        Path output = new Path("output");
        Path input = new Path("testdata");
        Configuration conf = new Configuration();
        HadoopUtil.delete(conf, new Path[]{output});
        Double convergenceDelta = 0.5D;
        DistanceMeasure measure = new SquaredEuclideanDistanceMeasure();
        int maxIterations = 10;
        Double t1 = 2.0D;
        Double t2 = 1.0D;

        Path directoryContainingConvertedInput = new Path(output, "data");
        System.out.println("Preparing Input");
        InputDriver.runJob(input, directoryContainingConvertedInput, "org.apache.mahout.math.RandomAccessSparseVector");
        System.out.println("Running Canopy to get initial clusters");
        Path canopyOutput = new Path(output, "canopies");
        CanopyDriver.run(new Configuration(), directoryContainingConvertedInput, canopyOutput, measure, t1, t2, false, 0.0D, false);
        System.out.println("Running KMeans");
        KMeansDriver.run(conf, directoryContainingConvertedInput, new Path(canopyOutput, "clusters-0-final"), output, convergenceDelta, maxIterations, false, 0.0D, false);
        Path outGlob = new Path(output, "clusters-*-final");
        Path clusteredPoints = new Path(output, "clusteredPoints");
        System.out.println("Dumping out clusters from clusters: " + outGlob + " and clusteredPoints:" + clusteredPoints);
        ClusterDumper clusterDumper = new ClusterDumper(outGlob, clusteredPoints);
        String[] args = {
                "-i", "output/clusters-*-final",
                "-o", "points",
                "-p", "output/clusteredPoints",
                "-of", "TEXT"
        };
        clusterDumper.run(args);
    }

    /**
     * Use canopy to get k, then use FuzzyKMeans to clustering
     * <p>
     * param:
     * 1.Path Input: The path of all data points to be clustered
     * 2.Path output: Storage path of clustering results
     * 3.DistanceMeasure measure: default SquaredEuclidean
     * 4.Double t1: Weak attribution distance, when the distance is less than T1, the current point is marked as weak attribution point, and adds this point to the corresponding cluster . t1>t2
     * 5.Double t2: Strong attribution distance, when the distance is less than T2, the marker's current point is a strong attribution point. If the current point is not a strong attribution point, create a new cluster at the center of the current point. For all cluster centers, compute new clusters
     * 6.fuzziness: Fuzzy parameters m,value(1,2],the larger the m, the greater the ambiguity.
     * 7.Double convergenceDelta: convergence delta, the distance between the new cluster center and the last cluster center should not exceed convergence delta ,if exceeds, continue, otherwise stop the iteration. default value is 0.001D
     * 8.int maxIterations: if iterations less than maxIterations ,continue,otherwise stop,if max iterations or convergence delta satisfy one,stopiterations
     * 9.boolean runClustering: If true, after computing the cluster center, calculate which cluster each data point belongs to, or end the cluster center,default true
     * 10.boolean clusteringOption: Calculate by single machine or Map Reduce method,default false: Map Reduce
     *
     * @throws Exception
     */
    public void canopyFuzzyKMeansRun() throws Exception {

        Path output = new Path("output");
        Path input = new Path("testdata");
        Configuration conf = new Configuration();
        HadoopUtil.delete(conf, new Path[]{output});
        Double convergenceDelta = 0.5D;
        DistanceMeasure measure = new ManhattanDistanceMeasure();
        int maxIterations = 10;
        double t1 = 2.0D;
        double t2 = 1.0D;
        float fuzziness = 2.0F;

        Path directoryContainingConvertedInput = new Path(output, "data");
        System.out.println("Preparing Input");
        InputDriver.runJob(input, directoryContainingConvertedInput, "org.apache.mahout.math.RandomAccessSparseVector");
        System.out.println("Running Canopy to get initial clusters");
        Path canopyOutput = new Path(output, "canopies");
        CanopyDriver.run(new Configuration(), directoryContainingConvertedInput, canopyOutput, measure, t1, t2, false, 0.0D, false);
        System.out.println("Running KMeans");
        FuzzyKMeansDriver.run(directoryContainingConvertedInput, new Path(canopyOutput, "clusters-0-final"), output, convergenceDelta, maxIterations, fuzziness, false, true, 0.0D, false);
        Path outGlob = new Path(output, "clusters-*-final");
        Path clusteredPoints = new Path(output, "clusteredPoints");
        System.out.println("Dumping out clusters from clusters: " + outGlob + " and clusteredPoints:" + clusteredPoints);
        ClusterDumper clusterDumper = new ClusterDumper(outGlob, clusteredPoints);
        String[] args = {
                "-i", "output/clusters-*-final",
                "-o", "points",
                "-p", "output/clusteredPoints",
                "-of", "TEXT"
        };
        clusterDumper.run(args);
    }

    /**
     * Format dump file
     */
    public void dumpTest() throws Exception {

        Path output = new Path("output");
        Path outGlob = new Path(output, "clusters-*-final");
        Path clusteredPoints = new Path(output, "clusteredPoints");
        ClusterDumper clusterDumper = new ClusterDumper(outGlob, clusteredPoints);
        String[] args = {
                "-i", "output/clusters-*-final",
                "-o", "points",
//                "-p","output/clusteredPoints",    // The directory containing points sequence files mapping input vectors to their cluster.
                "-of", "TEXT", // The optional output format. Options: TEXT, CSV, JSON or GRAPH_ML
//                "-sp", "3", // Specifies the maximum number of points to include _per_ cluster.
//                "-b", "100", // The number of chars of the asFormatString() to print. Title line's word number of each cluster.
//                "-n", "0",   // The number of top terms to print. Useless for me yet.
//                "endPhase"
        };
        clusterDumper.run(args);
//        ClusterDumper.main(args);
    }

    public static void main(String[] args) throws Exception {
        Mahout clusterNode = new Mahout();
//        clusterNode.kmeansRun();
//        clusterNode.canopyRun();
        clusterNode.fuzzyKMeansRun();
//        clusterNode.canopyKMeansRun();
//        clusterNode.canopyFuzzyKMeansRun();
//        clusterNode.dumpTest();
    }
}

Leave a Reply

Your email address will not be published. Required fields are marked *