NumPyML 源码解析(七)

numpy-ml\numpy_ml\trees\gbdt.py

# 导入 numpy 库并重命名为 np
import numpy as np

# 从当前目录下的 dt 模块中导入 DecisionTree 类
# 从当前目录下的 losses 模块中导入 MSELoss 和 CrossEntropyLoss 类
from .dt import DecisionTree
from .losses import MSELoss, CrossEntropyLoss

# 定义一个函数,将标签转换为 one-hot 编码
def to_one_hot(labels, n_classes=None):
    # 如果标签的维度大于 1,则抛出异常
    if labels.ndim > 1:
        raise ValueError("labels must have dimension 1, but got {}".format(labels.ndim))

    # 获取标签的数量
    N = labels.size
    # 计算 one-hot 编码的列数
    n_cols = np.max(labels) + 1 if n_classes is None else n_classes
    # 创建一个全零矩阵,用于存储 one-hot 编码
    one_hot = np.zeros((N, n_cols))
    # 将对应位置的值设为 1.0,实现 one-hot 编码
    one_hot[np.arange(N), labels] = 1.0
    return one_hot

# 定义一个梯度提升决策树的类
class GradientBoostedDecisionTree:
    def __init__(
        self,
        n_iter,
        max_depth=None,
        classifier=True,
        learning_rate=1,
        loss="crossentropy",
        step_size="constant",
    # 定义一个预测方法,用于对输入数据进行分类或预测
    def predict(self, X):
        """
        Use the trained model to classify or predict the examples in `X`.

        Parameters
        ----------
        X : :py:class:`ndarray <numpy.ndarray>` of shape `(N, M)`
            The training data of `N` examples, each with `M` features

        Returns
        -------
        preds : :py:class:`ndarray <numpy.ndarray>` of shape `(N,)`
            The integer class labels predicted for each example in `X` if
            ``self.classifier = True``, otherwise the predicted target values.
        """
        # 初始化预测结果矩阵
        Y_pred = np.zeros((X.shape[0], self.out_dims))
        # 遍历每个迭代器
        for i in range(self.n_iter):
            # 遍历每个输出维度
            for k in range(self.out_dims):
                # 根据权重和学习器的预测结果更新预测结果矩阵
                Y_pred[:, k] += self.weights[i, k] * self.learners[i, k].predict(X)

        # 如果是分类器,则取预测结果矩阵中每行最大值的索引作为预测结果
        if self.classifier:
            Y_pred = Y_pred.argmax(axis=1)

        return Y_pred

numpy-ml\numpy_ml\trees\losses.py

import numpy as np
# 导入 NumPy 库

#######################################################################
#                           Base Estimators                           #
#######################################################################

# 定义 ClassProbEstimator 类,用于计算类别概率
class ClassProbEstimator:
    # 训练函数,计算类别概率
    def fit(self, X, y):
        self.class_prob = y.sum() / len(y)

    # 预测函数,返回类别概率
    def predict(self, X):
        pred = np.empty(X.shape[0], dtype=np.float64)
        pred.fill(self.class_prob)
        return pred

# 定义 MeanBaseEstimator 类,用于计算均值
class MeanBaseEstimator:
    # 训练函数,计算均值
    def fit(self, X, y):
        self.avg = np.mean(y)

    # 预测函数,返回均值
    def predict(self, X):
        pred = np.empty(X.shape[0], dtype=np.float64)
        pred.fill(self.avg)
        return pred

#######################################################################
#                           Loss Functions                            #
#######################################################################

# 定义 MSELoss 类,用于计算均方误差
class MSELoss:
    # 计算均方误差
    def __call__(self, y, y_pred):
        return np.mean((y - y_pred) ** 2)

    # 返回基本估计器
    def base_estimator(self):
        return MeanBaseEstimator()

    # 计算梯度
    def grad(self, y, y_pred):
        return -2 / len(y) * (y - y_pred)

    # 线搜索函数
    def line_search(self, y, y_pred, h_pred):
        # TODO: revise this
        Lp = np.sum((y - y_pred) * h_pred)
        Lpp = np.sum(h_pred * h_pred)

        # if we perfectly fit the residuals, use max step size
        return 1 if np.sum(Lpp) == 0 else Lp / Lpp

# 定义 CrossEntropyLoss 类,用于计算交叉熵损失
class CrossEntropyLoss:
    # 计算交叉熵损失
    def __call__(self, y, y_pred):
        eps = np.finfo(float).eps
        return -np.sum(y * np.log(y_pred + eps))

    # 返回基本估计器
    def base_estimator(self):
        return ClassProbEstimator()

    # 计算梯度
    def grad(self, y, y_pred):
        eps = np.finfo(float).eps
        return -y * 1 / (y_pred + eps)

    # 线搜索函数
    def line_search(self, y, y_pred, h_pred):
        raise NotImplementedError

Tree-Based Models

This module implements:

  1. Decision trees for classification and regression via the CART algorithm (Breiman, Friedman, Olshen, & Stone, 1984).
  2. Random forests for classification and regression (an example of bootstrap
    aggregating) (Breiman, 2001).
  3. Gradient-boosted decision
    trees for classification and regression (an
    example of gradient boosted machines) (Friedman, 1999/2001).

Plots

numpy-ml\numpy_ml\trees\rf.py

# 导入 numpy 库并重命名为 np
import numpy as np
# 从当前目录下的 dt 模块中导入 DecisionTree 类
from .dt import DecisionTree

# 定义一个函数 bootstrap_sample,用于生成数据集的自助采样
def bootstrap_sample(X, Y):
    # 获取数据集 X 的行数和列数
    N, M = X.shape
    # 从 0 到 N 中随机选择 N 个数,允许重复
    idxs = np.random.choice(N, N, replace=True)
    # 返回自助采样后的数据集 X 和 Y
    return X[idxs], Y[idxs]

# 定义一个类 RandomForest,表示随机森林模型
class RandomForest:
    def __init__(
        self, n_trees, max_depth, n_feats, classifier=True, criterion="entropy"
    ):
        """
        An ensemble (forest) of decision trees where each split is calculated
        using a random subset of the features in the input.

        Parameters
        ----------
        n_trees : int
            The number of individual decision trees to use within the ensemble.
        max_depth: int or None
            The depth at which to stop growing each decision tree. If None,
            grow each tree until the leaf nodes are pure.
        n_feats : int
            The number of features to sample on each split.
        classifier : bool
            Whether `Y` contains class labels or real-valued targets. Default
            is True.
        criterion : {'entropy', 'gini', 'mse'}
            The error criterion to use when calculating splits for each weak
            learner. When ``classifier = False``, valid entries are {'mse'}.
            When ``classifier = True``, valid entries are {'entropy', 'gini'}.
            Default is 'entropy'.
        """
        # 初始化随机森林模型的属性
        self.trees = []
        self.n_trees = n_trees
        self.n_feats = n_feats
        self.max_depth = max_depth
        self.criterion = criterion
        self.classifier = classifier
    def fit(self, X, Y):
        """
        Create `n_trees`-worth of bootstrapped samples from the training data
        and use each to fit a separate decision tree.
        """
        # 初始化一个空列表用于存放决策树
        self.trees = []
        # 循环创建n_trees个决策树
        for _ in range(self.n_trees):
            # 从训练数据中创建一个自助采样样本
            X_samp, Y_samp = bootstrap_sample(X, Y)
            # 创建一个决策树对象
            tree = DecisionTree(
                n_feats=self.n_feats,
                max_depth=self.max_depth,
                criterion=self.criterion,
                classifier=self.classifier,
            )
            # 使用自助采样样本拟合决策树
            tree.fit(X_samp, Y_samp)
            # 将拟合好的决策树添加到列表中
            self.trees.append(tree)

    def predict(self, X):
        """
        Predict the target value for each entry in `X`.

        Parameters
        ----------
        X : :py:class:`ndarray <numpy.ndarray>` of shape `(N, M)`
            The training data of `N` examples, each with `M` features.

        Returns
        -------
        y_pred : :py:class:`ndarray <numpy.ndarray>` of shape `(N,)`
            Model predictions for each entry in `X`.
        """
        # 对每个决策树在输入数据X上进行预测
        tree_preds = np.array([[t._traverse(x, t.root) for x in X] for t in self.trees])
        # 对每个决策树的预测结果进行投票,返回最终的预测结果
        return self._vote(tree_preds)
    # 对于每个问题,返回随机森林中所有树的预测结果的聚合值
    def _vote(self, predictions):
        """
        Return the aggregated prediction across all trees in the RF for each problem.

        Parameters
        ----------
        predictions : :py:class:`ndarray <numpy.ndarray>` of shape `(n_trees, N)`
            The array of predictions from each decision tree in the RF for each
            of the `N` problems in `X`.

        Returns
        -------
        y_pred : :py:class:`ndarray <numpy.ndarray>` of shape `(N,)`
            If classifier is True, the class label predicted by the majority of
            the decision trees for each problem in `X`. If classifier is False,
            the average prediction across decision trees on each problem.
        """
        # 如果是分类器,则对每个问题返回大多数决策树预测的类标签
        if self.classifier:
            out = [np.bincount(x).argmax() for x in predictions.T]
        # 如果不是分类器,则对每个问题返回决策树预测的平均值
        else:
            out = [np.mean(x) for x in predictions.T]
        # 将结果转换为数组并返回
        return np.array(out)

numpy-ml\numpy_ml\trees\__init__.py

# 从当前目录中导入 losses 模块
from . import losses
# 从当前目录中导入 dt 模块中的所有内容
from .dt import *
# 从当前目录中导入 rf 模块中的所有内容
from .rf import *
# 从当前目录中导入 gbdt 模块中的所有内容
from .gbdt import *

numpy-ml\numpy_ml\utils\data_structures.py

# 导入 heapq 模块,用于实现优先队列
import heapq
# 从 copy 模块中导入 copy 函数
from copy import copy
# 从 collections 模块中导入 Hashable 类
from collections import Hashable
# 导入 numpy 模块,并使用 np 别名
import numpy as np
# 从当前目录下的 distance_metrics 模块中导入 euclidean 函数
from .distance_metrics import euclidean

#######################################################################
#                           Priority Queue                            #
#######################################################################

# 定义优先队列节点类
class PQNode(object):
    def __init__(self, key, val, priority, entry_id, **kwargs):
        """A generic node object for holding entries in :class:`PriorityQueue`"""
        # 初始化节点对象的属性
        self.key = key
        self.val = val
        self.entry_id = entry_id
        self.priority = priority

    def __repr__(self):
        # 返回节点对象的字符串表示
        fstr = "PQNode(key={}, val={}, priority={}, entry_id={})"
        return fstr.format(self.key, self.val, self.priority, self.entry_id)

    def to_dict(self):
        """Return a dictionary representation of the node's contents"""
        # 返回节点对象的字典表示
        d = self.__dict__
        d["id"] = "PQNode"
        return d

    def __gt__(self, other):
        # 实现大于比较操作符
        if not isinstance(other, PQNode):
            return -1
        if self.priority == other.priority:
            return self.entry_id > other.entry_id
        return self.priority > other.priority

    def __ge__(self, other):
        # 实现大于等于比较操作符
        if not isinstance(other, PQNode):
            return -1
        return self.priority >= other.priority

    def __lt__(self, other):
        # 实现小于比较操作符
        if not isinstance(other, PQNode):
            return -1
        if self.priority == other.priority:
            return self.entry_id < other.entry_id
        return self.priority < other.priority

    def __le__(self, other):
        # 实现小于等于比较操作符
        if not isinstance(other, PQNode):
            return -1
        return self.priority <= other.priority

# 定义优先队列类
class PriorityQueue:
    # 初始化优先队列对象
    def __init__(self, capacity, heap_order="max"):
        """
        A priority queue implementation using a binary heap.

        Notes
        -----
        A priority queue is a data structure useful for storing the top
        `capacity` largest or smallest elements in a collection of values. As a
        result of using a binary heap, ``PriorityQueue`` offers `O(log N)`
        :meth:`push` and :meth:`pop` operations.

        Parameters
        ----------
        capacity: int
            The maximum number of items that can be held in the queue.
        heap_order: {"max", "min"}
            Whether the priority queue should retain the items with the
            `capacity` smallest (`heap_order` = 'min') or `capacity` largest
            (`heap_order` = 'max') priorities.
        """
        # 检查堆序是否合法
        assert heap_order in ["max", "min"], "heap_order must be either 'max' or 'min'"
        # 初始化容量和堆序
        self.capacity = capacity
        self.heap_order = heap_order

        # 初始化优先队列列表、计数器和条目计数器
        self._pq = []
        self._count = 0
        self._entry_counter = 0

    # 返回优先队列的字符串表示
    def __repr__(self):
        fstr = "PriorityQueue(capacity={}, heap_order={}) with {} items"
        return fstr.format(self.capacity, self.heap_order, self._count)

    # 返回优先队列中元素的数量
    def __len__(self):
        return self._count

    # 返回优先队列的迭代器
    def __iter__(self):
        return iter(self._pq)
    def push(self, key, priority, val=None):
        """
        Add a new (key, value) pair with priority `priority` to the queue.

        Notes
        -----
        If the queue is at capacity and `priority` exceeds the priority of the
        item with the largest/smallest priority currently in the queue, replace
        the current queue item with (`key`, `val`).

        Parameters
        ----------
        key : hashable object
            The key to insert into the queue.
        priority : comparable
            The priority for the `key`, `val` pair.
        val : object
            The value associated with `key`. Default is None.
        """
        # 如果队列是最大堆,则将优先级取负数
        if self.heap_order == "max":
            priority = -1 * priority

        # 创建一个新的 PQNode 对象
        item = PQNode(key=key, val=val, priority=priority, entry_id=self._entry_counter)
        # 将新节点加入堆中
        heapq.heappush(self._pq, item)

        # 更新计数器
        self._count += 1
        self._entry_counter += 1

        # 当队列超过容量时,弹出队列中的元素
        while self._count > self.capacity:
            self.pop()

    def pop(self):
        """
        Remove the item with the largest/smallest (depending on
        ``self.heap_order``) priority from the queue and return it.

        Notes
        -----
        In contrast to :meth:`peek`, this operation is `O(log N)`.

        Returns
        -------
        item : :class:`PQNode` instance or None
            Item with the largest/smallest priority, depending on
            ``self.heap_order``.
        """
        # 从堆中弹出具有最大/最小优先级的元素,并将其转换为字典
        item = heapq.heappop(self._pq).to_dict()
        # 如果队列是最大堆,则将优先级取负数恢复原值
        if self.heap_order == "max":
            item["priority"] = -1 * item["priority"]
        # 更新计数器
        self._count -= 1
        return item
    # 返回具有最大/最小优先级(取决于self.heap_order)的项目,但不从队列中删除它
    def peek(self):
        """
        Return the item with the largest/smallest (depending on
        ``self.heap_order``) priority *without* removing it from the queue.

        Notes
        -----
        In contrast to :meth:`pop`, this operation is O(1).

        Returns
        -------
        item : :class:`PQNode` instance or None
            Item with the largest/smallest priority, depending on
            ``self.heap_order``.
        """
        # 初始化item为None
        item = None
        # 如果队列中有元素
        if self._count > 0:
            # 复制队列中第一个元素的字典表示
            item = copy(self._pq[0].to_dict())
            # 如果堆的顺序是最大堆
            if self.heap_order == "max":
                # 将item的优先级取反
                item["priority"] = -1 * item["priority"]
        # 返回item
        return item
# 定义 BallTreeNode 类,表示 Ball 树的节点
class BallTreeNode:
    # 初始化方法,设置节点的属性
    def __init__(self, centroid=None, X=None, y=None):
        self.left = None
        self.right = None
        self.radius = None
        self.is_leaf = False

        self.data = X
        self.targets = y
        self.centroid = centroid

    # 重写 __repr__ 方法,返回节点的字符串表示
    def __repr__(self):
        fstr = "BallTreeNode(centroid={}, is_leaf={})"
        return fstr.format(self.centroid, self.is_leaf)

    # 将节点转换为字典形式
    def to_dict(self):
        d = self.__dict__
        d["id"] = "BallTreeNode"
        return d


# 定义 BallTree 类
class BallTree:
    # 初始化 BallTree 数据结构
    def __init__(self, leaf_size=40, metric=None):
        """
        A ball tree data structure.

        Notes
        -----
        A ball tree is a binary tree in which every node defines a
        `D`-dimensional hypersphere ("ball") containing a subset of the points
        to be searched. Each internal node of the tree partitions the data
        points into two disjoint sets which are associated with different
        balls. While the balls themselves may intersect, each point is assigned
        to one or the other ball in the partition according to its distance
        from the ball's center. Each leaf node in the tree defines a ball and
        enumerates all data points inside that ball.

        Parameters
        ----------
        leaf_size : int
            The maximum number of datapoints at each leaf. Default is 40.
        metric : :doc:`Distance metric <numpy_ml.utils.distance_metrics>` or None
            The distance metric to use for computing nearest neighbors. If
            None, use the :func:`~numpy_ml.utils.distance_metrics.euclidean`
            metric. Default is None.

        References
        ----------
        .. [1] Omohundro, S. M. (1989). "Five balltree construction algorithms". *ICSI
           Technical Report TR-89-063*.
        .. [2] Liu, T., Moore, A., & Gray A. (2006). "New algorithms for efficient
           high-dimensional nonparametric classification". *J. Mach. Learn. Res.,
           7*, 1135-1158.
        """
        # 初始化根节点为 None
        self.root = None
        # 设置叶子节点最大数据点数,默认为 40
        self.leaf_size = leaf_size
        # 设置距离度量方式,如果未指定则使用欧氏距离
        self.metric = metric if metric is not None else euclidean
    def fit(self, X, y=None):
        """
        Build a ball tree recursively using the O(M log N) `k`-d construction
        algorithm.

        Notes
        -----
        Recursively divides data into nodes defined by a centroid `C` and radius
        `r` such that each point below the node lies within the hyper-sphere
        defined by `C` and `r`.

        Parameters
        ----------
        X : :py:class:`ndarray <numpy.ndarray>` of shape `(N, M)`
            An array of `N` examples each with `M` features.
        y : :py:class:`ndarray <numpy.ndarray>` of shape `(N, \*)` or None
            An array of target values / labels associated with the entries in
            `X`. Default is None.
        """
        # 分割数据集,得到中心点、左子集、右子集
        centroid, left_X, left_y, right_X, right_y = self._split(X, y)
        # 创建根节点,设置中心点和半径
        self.root = BallTreeNode(centroid=centroid)
        self.root.radius = np.max([self.metric(centroid, x) for x in X])
        # 递归构建左右子树
        self.root.left = self._build_tree(left_X, left_y)
        self.root.right = self._build_tree(right_X, right_y)

    def _build_tree(self, X, y):
        # 分割数据集,得到中心点、左子集、右子集
        centroid, left_X, left_y, right_X, right_y = self._split(X, y)

        # 如果数据集大小小于等于叶子节点大小阈值,则创建叶子节点
        if X.shape[0] <= self.leaf_size:
            leaf = BallTreeNode(centroid=centroid, X=X, y=y)
            leaf.radius = np.max([self.metric(centroid, x) for x in X])
            leaf.is_leaf = True
            return leaf

        # 创建内部节点,设置中心点和半径
        node = BallTreeNode(centroid=centroid)
        node.radius = np.max([self.metric(centroid, x) for x in X])
        # 递归构建左右子树
        node.left = self._build_tree(left_X, left_y)
        node.right = self._build_tree(right_X, right_y)
        return node
    # 将数据集 X 拆分成两部分,根据最大方差的维度进行拆分
    def _split(self, X, y=None):
        # 找到方差最大的维度
        split_dim = np.argmax(np.var(X, axis=0))

        # 沿着 split_dim 对 X 和 y 进行排序
        sort_ixs = np.argsort(X[:, split_dim])
        X, y = X[sort_ixs], y[sort_ixs] if y is not None else None

        # 在 split_dim 的中位数处划分数据
        med_ix = X.shape[0] // 2
        centroid = X[med_ix]  # , split_dim

        # 在中心点(中位数总是出现在右侧划分)处将数据分成两半
        left_X, left_y = X[:med_ix], y[:med_ix] if y is not None else None
        right_X, right_y = X[med_ix:], y[med_ix:] if y is not None else None
        return centroid, left_X, left_y, right_X, right_y

    # 使用 KNS1 算法找到球树中距离查询向量 x 最近的 k 个邻居
    def nearest_neighbors(self, k, x):
        """
        Find the `k` nearest neighbors in the ball tree to a query vector `x`
        using the KNS1 algorithm.

        Parameters
        ----------
        k : int
            The number of closest points in `X` to return
        x : :py:class:`ndarray <numpy.ndarray>` of shape `(1, M)`
            The query vector.

        Returns
        -------
        nearest : list of :class:`PQNode` s of length `k`
            List of the `k` points in `X` to closest to the query vector. The
            ``key`` attribute of each :class:`PQNode` contains the point itself, the
            ``val`` attribute contains its target, and the ``distance``
            attribute contains its distance to the query vector.
        """
        # 维护一个最大优先队列,优先级为到 x 的距离
        PQ = PriorityQueue(capacity=k, heap_order="max")
        # 调用 _knn 方法找到最近的 k 个邻居
        nearest = self._knn(k, x, PQ, self.root)
        # 计算每个邻居点到查询向量 x 的距离
        for n in nearest:
            n.distance = self.metric(x, n.key)
        return nearest
    # 使用 k 近邻算法查找最近的 k 个点
    def _knn(self, k, x, PQ, root):
        # 定义距离度量函数
        dist = self.metric
        # 计算点 x 到当前节点球体表面的距离
        dist_to_ball = dist(x, root.centroid) - root.radius
        # 计算点 x 到当前优先队列中最远邻居的距离
        dist_to_farthest_neighbor = dist(x, PQ.peek()["key"]) if len(PQ) > 0 else np.inf

        # 如果点 x 到球体表面的距离大于等于到最远邻居的距离,并且优先队列中已经有 k 个点,则直接返回当前优先队列
        if dist_to_ball >= dist_to_farthest_neighbor and len(PQ) == k:
            return PQ
        # 如果当前节点是叶子节点
        if root.is_leaf:
            # 初始化目标值列表
            targets = [None] * len(root.data) if root.targets is None else root.targets
            # 遍历当前节点的数据点和目标值
            for point, target in zip(root.data, targets):
                # 计算点 x 到当前数据点的距离
                dist_to_x = dist(x, point)
                # 如果优先队列中已经有 k 个点,并且点 x 到当前数据点的距离小于到最远邻居的距离,则将当前数据点加入优先队列
                if len(PQ) == k and dist_to_x < dist_to_farthest_neighbor:
                    PQ.push(key=point, val=target, priority=dist_to_x)
                else:
                    PQ.push(key=point, val=target, priority=dist_to_x)
        else:
            # 判断点 x 更接近左子节点还是右子节点
            l_closest = dist(x, root.left.centroid) < dist(x, root.right.centroid)
            # 递归调用_knn函数,继续在左子树中查找最近的 k 个点
            PQ = self._knn(k, x, PQ, root.left if l_closest else root.right)
            # 递归调用_knn函数,继续在右子树中查找最近的 k 个点
            PQ = self._knn(k, x, PQ, root.right if l_closest else root.left)
        # 返回最终的优先队列
        return PQ
#######################################################################
#                         Multinomial Sampler                         #
#######################################################################

# 定义一个离散采样器类
class DiscreteSampler:
    # 从概率分布`probs`中生成随机抽样整数,范围在[0, N)之间
    def __call__(self, n_samples=1):
        """
        Generate random draws from the `probs` distribution over integers in
        [0, N).

        Parameters
        ----------
        n_samples: int
            The number of samples to generate. Default is 1.

        Returns
        -------
        sample : :py:class:`ndarray <numpy.ndarray>` of shape `(n_samples,)`
            A collection of draws from the distribution defined by `probs`.
            Each sample is an int in the range `[0, N)`.
        """
        return self.sample(n_samples)

    # 从概率分布`probs`中生成随机抽样整数,范围在[0, N)之间
    def sample(self, n_samples=1):
        """
        Generate random draws from the `probs` distribution over integers in
        [0, N).

        Parameters
        ----------
        n_samples: int
            The number of samples to generate. Default is 1.

        Returns
        -------
        sample : :py:class:`ndarray <numpy.ndarray>` of shape `(n_samples,)`
            A collection of draws from the distribution defined by `probs`.
            Each sample is an int in the range `[0, N)`.
        """
        # 生成随机整数索引
        ixs = np.random.randint(0, self.N, n_samples)
        # 计算概率值
        p = np.exp(self.prob_table[ixs]) if self.log else self.prob_table[ixs]
        # 生成二项分布随机数
        flips = np.random.binomial(1, p)
        # 根据二项分布结果生成抽样
        samples = [ix if f else self.alias_table[ix] for ix, f in zip(ixs, flips)]

        # 使用递归拒绝采样来进行无重复抽样
        if not self.with_replacement:
            unique = list(set(samples))
            while len(samples) != len(unique):
                n_new = len(samples) - len(unique)
                samples = unique + self.sample(n_new).tolist()
                unique = list(set(samples))

        return np.array(samples, dtype=int)
# 定义一个名为 Dict 的字典子类
class Dict(dict):
    # 初始化方法,接受一个编码器参数
    def __init__(self, encoder=None):
        """
        A dictionary subclass which returns the key value if it is not in the
        dict.

        Parameters
        ----------
        encoder : function or None
            A function which is applied to a key before adding / retrieving it
            from the dictionary. If None, the function defaults to the
            identity. Default is None.
        """
        # 调用父类的初始化方法
        super(Dict, self).__init__()
        # 设置编码器和最大 ID
        self._encoder = encoder
        self._id_max = 0

    # 重写 __setitem__ 方法
    def __setitem__(self, key, value):
        # 如果存在编码器,则对键进行编码
        if self._encoder is not None:
            key = self._encoder(key)
        # 如果键不可哈希,则转换为元组
        elif not isinstance(key, Hashable):
            key = tuple(key)
        # 调用父类的 __setitem__ 方法
        super(Dict, self).__setitem__(key, value)

    # 编码键的方法
    def _encode_key(self, key):
        # 获取父类的实例
        D = super(Dict, self)
        # 对键进行编码
        enc_key = self._encoder(key)
        # 如果编码后的键存在于字典中,则返回对应的值,否则添加新键值对
        if D.__contains__(enc_key):
            val = D.__getitem__(enc_key)
        else:
            val = self._id_max
            D.__setitem__(enc_key, val)
            self._id_max += 1
        return val

    # 重写 __getitem__ 方法
    def __getitem__(self, key):
        # 深拷贝键
        self._key = copy.deepcopy(key)
        # 如果存在编码器,则返回编码后的键对应的值
        if self._encoder is not None:
            return self._encode_key(key)
        # 如果键不可哈希,则转换为元组
        elif not isinstance(key, Hashable):
            key = tuple(key)
        # 调用父类的 __getitem__ 方法
        return super(Dict, self).__getitem__(key)

    # 处理缺失键的方法
    def __missing__(self, key):
        return self._key

numpy-ml\numpy_ml\utils\distance_metrics.py

import numpy as np

# 计算两个实向量之间的欧几里德(L2)距离
def euclidean(x, y):
    """
    Compute the Euclidean (`L2`) distance between two real vectors

    Notes
    -----
    The Euclidean distance between two vectors **x** and **y** is

    .. math::

        d(\mathbf{x}, \mathbf{y}) = \sqrt{ \sum_i (x_i - y_i)^2  }

    Parameters
    ----------
    x,y : :py:class:`ndarray <numpy.ndarray>` s of shape `(N,)`
        The two vectors to compute the distance between

    Returns
    -------
    d : float
        The L2 distance between **x** and **y.
    """
    return np.sqrt(np.sum((x - y) ** 2))


# 计算两个实向量之间的曼哈顿(L1)距离
def manhattan(x, y):
    """
    Compute the Manhattan (`L1`) distance between two real vectors

    Notes
    -----
    The Manhattan distance between two vectors **x** and **y** is

    .. math::

        d(\mathbf{x}, \mathbf{y}) = \sum_i |x_i - y_i|

    Parameters
    ----------
    x,y : :py:class:`ndarray <numpy.ndarray>` s of shape `(N,)`
        The two vectors to compute the distance between

    Returns
    -------
    d : float
        The L1 distance between **x** and **y.
    """
    return np.sum(np.abs(x - y))


# 计算两个实向量之间的切比雪夫(L∞)距离
def chebyshev(x, y):
    """
    Compute the Chebyshev (:math:`L_\infty`) distance between two real vectors

    Notes
    -----
    The Chebyshev distance between two vectors **x** and **y** is

    .. math::

        d(\mathbf{x}, \mathbf{y}) = \max_i |x_i - y_i|

    Parameters
    ----------
    x,y : :py:class:`ndarray <numpy.ndarray>` s of shape `(N,)`
        The two vectors to compute the distance between

    Returns
    -------
    d : float
        The Chebyshev distance between **x** and **y.
    """
    return np.max(np.abs(x - y))


# 计算两个实向量之间的闵可夫斯基(Minkowski)距离
def minkowski(x, y, p):
    """
    Compute the Minkowski-`p` distance between two real vectors.

    Notes
    -----
    The Minkowski-`p` distance between two vectors **x** and **y** is

    .. math::

        d(\mathbf{x}, \mathbf{y}) = \left( \sum_i |x_i - y_i|^p \\right)^{1/p}

    Parameters
    ----------
    # 定义两个形状为`(N,)`的ndarray类型的向量x和y,用于计算它们之间的距离
    x,y : :py:class:`ndarray <numpy.ndarray>` s of shape `(N,)`
        The two vectors to compute the distance between
    # 定义距离函数的参数p,当p = 1时,为L1距离,当p = 2时,为L2距离。当p < 1时,闵可夫斯基-p不满足三角不等式,因此不是有效的距离度量
    p : float > 1
        The parameter of the distance function. When `p = 1`, this is the `L1`
        distance, and when `p=2`, this is the `L2` distance. For `p < 1`,
        Minkowski-`p` does not satisfy the triangle inequality and hence is not
        a valid distance metric.

    # 返回值
    Returns
    -------
    # 返回向量x和y之间的闵可夫斯基-p距离
    d : float
        The Minkowski-`p` distance between **x** and **y**.
    """
    # 计算闵可夫斯基-p距离的公式
    return np.sum(np.abs(x - y) ** p) ** (1 / p)
# 计算两个整数向量之间的汉明距离

# 汉明距离是指两个向量 x 和 y 之间的距离
# 其计算方式为:d(x, y) = 1/N * Σ(1_{x_i ≠ y_i})

# 参数:
# x, y:形状为(N,)的numpy.ndarray数组
# 要计算距离的两个向量。这两个向量应为整数值。

# 返回值:
# d:浮点数
# x 和 y 之间的汉明距离。
def hamming(x, y):
    return np.sum(x != y) / len(x)

numpy-ml\numpy_ml\utils\graphs.py

from abc import ABC, abstractmethod
from collections import defaultdict
from itertools import combinations, permutations
import numpy as np

#######################################################################
#                          Graph Components                           #
#######################################################################

# 定义边对象
class Edge(object):
    def __init__(self, fr, to, w=None):
        """
        A generic directed edge object.

        Parameters
        ----------
        fr: int
            The id of the vertex the edge goes from
        to: int
            The id of the vertex the edge goes to
        w: float, :class:`Object` instance, or None
            The edge weight, if applicable. If weight is an arbitrary Object it
            must have a method called 'sample' which takes no arguments and
            returns a random sample from the weight distribution. If `w` is
            None, no weight is assumed. Default is None.
        """
        self.fr = fr
        self.to = to
        self._w = w

    def __repr__(self):
        return "{} -> {}, weight: {}".format(self.fr, self.to, self._w)

    @property
    def weight(self):
        return self._w.sample() if hasattr(self._w, "sample") else self._w

    def reverse(self):
        """Reverse the edge direction"""
        return Edge(self.t, self.f, self.w)

#######################################################################
#                             Graph Types                             #
#######################################################################

# 定义图的抽象基类
class Graph(ABC):
    def __init__(self, V, E):
        self._I2V = {i: v for i, v in zip(range(len(V)), V)}
        self._V2I = {v: i for i, v in zip(range(len(V)), V)}
        self._G = {i: set() for i in range(len(V))}
        self._V = V
        self._E = E

        self._build_adjacency_list()

    def __getitem__(self, v_i):
        return self.get_neighbors(v_i)
    # 获取给定顶点的内部索引
    def get_index(self, v):
        """Get the internal index for a given vetex"""
        return self._V2I[v]

    # 根据给定内部索引获取原始顶点
    def get_vertex(self, v_i):
        """Get the original vertex from a given internal index"""
        return self._I2V[v_i]

    # 返回图的顶点集合
    @property
    def vertices(self):
        return self._V

    # 返回图的顶点索引集合
    @property
    def indices(self):
        return list(range(len(self.vertices)))

    # 返回图的边集合
    @property
    def edges(self):
        return self._E

    # 返回从具有索引 `v_i` 的顶点可达的顶点的内部索引列表
    def get_neighbors(self, v_i):
        """
        Return the internal indices of the vertices reachable from the vertex
        with index `v_i`.
        """
        return [self._V2I[e.to] for e in self._G[v_i]]

    # 返回图的邻接矩阵表示
    def to_matrix(self):
        """Return an adjacency matrix representation of the graph"""
        adj_mat = np.zeros((len(self._V), len(self._V)))
        for e in self.edges:
            fr, to = self._V2I[e.fr], self._V2I[e.to]
            adj_mat[fr, to] = 1 if e.weight is None else e.weight
        return adj_mat

    # 返回图的邻接字典表示
    def to_adj_dict(self):
        """Return an adjacency dictionary representation of the graph"""
        adj_dict = defaultdict(lambda: list())
        for e in self.edges:
            adj_dict[e.fr].append(e)
        return adj_dict
    def path_exists(self, s_i, e_i):
        """
        Check whether a path exists from vertex index `s_i` to `e_i`.

        Parameters
        ----------
        s_i: Int
            The interal index of the start vertex
        e_i: Int
            The internal index of the end vertex

        Returns
        -------
        path_exists : Boolean
            Whether or not a valid path exists between `s_i` and `e_i`.
        """
        # 初始化队列,起始点为s_i,路径为[s_i]
        queue = [(s_i, [s_i])]
        # 循环直到队列为空
        while len(queue):
            # 取出队列中的第一个元素
            c_i, path = queue.pop(0)
            # 获取当前节点的邻居节点,但不包括已经在路径中的节点
            nbrs_not_on_path = set(self.get_neighbors(c_i)) - set(path)

            # 遍历邻居节点
            for n_i in nbrs_not_on_path:
                # 将邻居节点加入路径,并加入队列
                queue.append((n_i, path + [n_i]))
                # 如果找到终点e_i,则返回True
                if n_i == e_i:
                    return True
        # 如果循环结束仍未找到路径,则返回False
        return False

    def all_paths(self, s_i, e_i):
        """
        Find all simple paths between `s_i` and `e_i` in the graph.

        Notes
        -----
        Uses breadth-first search. Ignores all paths with repeated vertices.

        Parameters
        ----------
        s_i: Int
            The interal index of the start vertex
        e_i: Int
            The internal index of the end vertex

        Returns
        -------
        complete_paths : list of lists
            A list of all paths from `s_i` to `e_i`. Each path is represented
            as a list of interal vertex indices.
        """
        # 初始化存储所有路径的列表
        complete_paths = []
        # 初始化队列,起始点为s_i,路径为[s_i]
        queue = [(s_i, [s_i])]

        # 循环直到队列为空
        while len(queue):
            # 取出队列中的第一个元素
            c_i, path = queue.pop(0)
            # 获取当前节点的邻居节点,但不包括已经在路径中的节点
            nbrs_not_on_path = set(self.get_neighbors(c_i)) - set(path)

            # 遍历邻居节点
            for n_i in nbrs_not_on_path:
                # 如果找到终点e_i,则将完整路径加入complete_paths
                if n_i == e_i:
                    complete_paths.append(path + [n_i])
                else:
                    # 将邻居节点加入路径,并加入队列
                    queue.append((n_i, path + [n_i]))

        # 返回所有找到的路径
        return complete_paths

    @abstractmethod
    def _build_adjacency_list(self):
        pass
class DiGraph(Graph):
    def __init__(self, V, E):
        """
        A generic directed graph object.

        Parameters
        ----------
        V : list
            A list of vertex IDs.
        E : list of :class:`Edge <numpy_ml.utils.graphs.Edge>` objects
            A list of directed edges connecting pairs of vertices in ``V``.
        """
        # 调用父类的构造函数,传入顶点列表和边列表
        super().__init__(V, E)
        # 设置图为有向图
        self.is_directed = True
        # 初始化拓扑排序列表为空
        self._topological_ordering = []

    def _build_adjacency_list(self):
        """Encode directed graph as an adjancency list"""
        # 假设没有平行边
        for e in self.edges:
            # 获取起始顶点在顶点列表中的索引
            fr_i = self._V2I[e.fr]
            # 将边添加到起始顶点的邻接表中
            self._G[fr_i].add(e)

    def reverse(self):
        """Reverse the direction of all edges in the graph"""
        # 返回一个新的有向图对象,边的方向取反
        return DiGraph(self.vertices, [e.reverse() for e in self.edges])

    def is_acyclic(self):
        """Check whether the graph contains cycles"""
        # 检查图是否包含环,通过判断拓扑排序结果是否为None来判断
        return self.topological_ordering() is not None


class UndirectedGraph(Graph):
    def __init__(self, V, E):
        """
        A generic undirected graph object.

        Parameters
        ----------
        V : list
            A list of vertex IDs.
        E : list of :class:`Edge <numpy_ml.utils.graphs.Edge>` objects
            A list of edges connecting pairs of vertices in ``V``. For any edge
            connecting vertex `u` to vertex `v`, :class:`UndirectedGraph
            <numpy_ml.utils.graphs.UndirectedGraph>` will assume that there
            exists a corresponding edge connecting `v` to `u`, even if this is
            not present in `E`.
        """
        # 调用父类的构造函数,传入顶点列表和边列表
        super().__init__(V, E)
        # 设置图为无向图
        self.is_directed = False
    # 构建邻接表来表示无向、无权重图
    def _build_adjacency_list(self):
        """Encode undirected, unweighted graph as an adjancency list"""
        # 假设没有平行边
        # 每条边出现两次,分别为 (u,v) 和 (v,u)
        for e in self.edges:
            # 获取起始节点和目标节点在节点列表中的索引
            fr_i = self._V2I[e.fr]
            to_i = self._V2I[e.to]

            # 将边添加到起始节点的邻接表中
            self._G[fr_i].add(e)
            # 将边的反向边添加到目标节点的邻接表中
            self._G[to_i].add(e.reverse())
#######################################################################
#                          Graph Generators                           #
#######################################################################

# 生成一个无权 Erdős-Rényi 随机图
def random_unweighted_graph(n_vertices, edge_prob=0.5, directed=False):
    """
    Generate an unweighted Erdős-Rényi random graph [*]_.

    References
    ----------
    .. [*] Erdős, P. and Rényi, A. (1959). On Random Graphs, *Publ. Math. 6*, 290.

    Parameters
    ----------
    n_vertices : int
        The number of vertices in the graph.
    edge_prob : float in [0, 1]
        The probability of forming an edge between two vertices. Default is
        0.5.
    directed : bool
        Whether the edges in the graph should be directed. Default is False.

    Returns
    -------
    G : :class:`Graph` instance
        The resulting random graph.
    """
    vertices = list(range(n_vertices))
    candidates = permutations(vertices, 2) if directed else combinations(vertices, 2)

    edges = []
    # 遍历所有可能的边
    for (fr, to) in candidates:
        # 根据概率确定是否生成边
        if np.random.rand() <= edge_prob:
            edges.append(Edge(fr, to))

    # 如果是有向图,则返回有向图对象;否则返回无向图对象
    return DiGraph(vertices, edges) if directed else UndirectedGraph(vertices, edges)


# 创建一个 '随机' 无权有向无环图,通过修剪所有反向连接来生成
def random_DAG(n_vertices, edge_prob=0.5):
    """
    Create a 'random' unweighted directed acyclic graph by pruning all the
    backward connections from a random graph.

    Parameters
    ----------
    n_vertices : int
        The number of vertices in the graph.
    edge_prob : float in [0, 1]
        The probability of forming an edge between two vertices in the
        underlying random graph, before edge pruning. Default is 0.5.

    Returns
    -------
    G : :class:`Graph` instance
        The resulting DAG.
    """
    # 生成一个有向随机图
    G = random_unweighted_graph(n_vertices, edge_prob, directed=True)

    # 修剪边以删除顶点之间的反向连接
    G = DiGraph(G.vertices, [e for e in G.edges if e.fr < e.to])
    # 如果我们删除了所有边,则生成一个新的图
    while not len(G.edges):
        # 生成一个具有随机权重的有向图
        G = random_unweighted_graph(n_vertices, edge_prob, directed=True)
        # 创建一个新的有向图,只包含起点小于终点的边
        G = DiGraph(G.vertices, [e for e in G.edges if e.fr < e.to])
    # 返回生成的图
    return G

numpy-ml\numpy_ml\utils\kernels.py

# 导入 re 模块,用于正则表达式操作
# 导入 ABC 抽象基类和 abstractmethod 装饰器
import re
from abc import ABC, abstractmethod

# 导入 numpy 模块,并重命名为 np
import numpy as np

# 定义一个抽象基类 KernelBase
class KernelBase(ABC):
    # 初始化方法
    def __init__(self):
        super().__init__()
        # 初始化参数字典和超参数字典
        self.parameters = {}
        self.hyperparameters = {}

    # 抽象方法,用于计算核函数
    @abstractmethod
    def _kernel(self, X, Y):
        raise NotImplementedError

    # 重载 __call__ 方法,调用 _kernel 方法
    def __call__(self, X, Y=None):
        """Refer to documentation for the `_kernel` method"""
        return self._kernel(X, Y)

    # 重载 __str__ 方法,返回模型的字符串表示
    def __str__(self):
        P, H = self.parameters, self.hyperparameters
        p_str = ", ".join(["{}={}".format(k, v) for k, v in P.items()])
        return "{}({})".format(H["id"], p_str)

    # 定义 summary 方法,返回模型参数、超参数和 ID 的字典
    def summary(self):
        """Return the dictionary of model parameters, hyperparameters, and ID"""
        return {
            "id": self.hyperparameters["id"],
            "parameters": self.parameters,
            "hyperparameters": self.hyperparameters,
        }
    def set_params(self, summary_dict):
        """
        Set the model parameters and hyperparameters using the settings in
        `summary_dict`.

        Parameters
        ----------
        summary_dict : dict
            A dictionary with keys 'parameters' and 'hyperparameters',
            structured as would be returned by the :meth:`summary` method. If
            a particular (hyper)parameter is not included in this dict, the
            current value will be used.

        Returns
        -------
        new_kernel : :doc:`Kernel <numpy_ml.utils.kernels>` instance
            A kernel with parameters and hyperparameters adjusted to those
            specified in `summary_dict`.
        """
        # 将 self 和 summary_dict 分别赋值给 kr 和 sd
        kr, sd = self, summary_dict

        # 将 `parameters` 和 `hyperparameters` 嵌套字典合并为一个字典
        flatten_keys = ["parameters", "hyperparameters"]
        for k in flatten_keys:
            if k in sd:
                entry = sd[k]
                sd.update(entry)
                del sd[k]

        # 遍历 summary_dict 中的键值对
        for k, v in sd.items():
            # 如果键在 self 的 parameters 中,则更新参数值
            if k in self.parameters:
                kr.parameters[k] = v
            # 如果键在 self 的 hyperparameters 中,则更新超参数值
            if k in self.hyperparameters:
                kr.hyperparameters[k] = v
        # 返回更新后的 kernel 实例
        return kr
# 定义线性核函数类,继承自 KernelBase 类
class LinearKernel(KernelBase):
    # 初始化函数,设置线性核函数的参数 c0
    def __init__(self, c0=0):
        """
        The linear (i.e., dot-product) kernel.

        Notes
        -----
        For input vectors :math:`\mathbf{x}` and :math:`\mathbf{y}`, the linear
        kernel is:

        .. math::

            k(\mathbf{x}, \mathbf{y}) = \mathbf{x}^\\top \mathbf{y} + c_0

        Parameters
        ----------
        c0 : float
            An "inhomogeneity" parameter. When `c0` = 0, the kernel is said to be
            homogenous. Default is 1.
        """
        # 调用父类的初始化函数
        super().__init__()
        # 设置超参数字典
        self.hyperparameters = {"id": "LinearKernel"}
        # 设置参数字典,包括 c0 参数
        self.parameters = {"c0": c0}

    # 计算线性核函数的内部函数,计算输入矩阵 X 和 Y 的点积
    def _kernel(self, X, Y=None):
        """
        Compute the linear kernel (i.e., dot-product) between all pairs of rows in
        `X` and `Y`.

        Parameters
        ----------
        X : :py:class:`ndarray <numpy.ndarray>` of shape `(N, C)`
            Collection of `N` input vectors
        Y : :py:class:`ndarray <numpy.ndarray>` of shape `(M, C)` or None
            Collection of `M` input vectors. If None, assume `Y` = `X`.
            Default is None.

        Returns
        -------
        out : :py:class:`ndarray <numpy.ndarray>` of shape `(N, M)`
            Similarity between `X` and `Y`, where index (`i`, `j`) gives
            :math:`k(x_i, y_j)`.
        """
        # 检查输入矩阵 X 和 Y,确保它们符合要求
        X, Y = kernel_checks(X, Y)
        # 返回 X 和 Y 的点积加上参数 c0 的结果
        return X @ Y.T + self.parameters["c0"]


class PolynomialKernel(KernelBase):
    # 初始化多项式核函数对象,设置默认参数值
    def __init__(self, d=3, gamma=None, c0=1):
        """
        The degree-`d` polynomial kernel.

        Notes
        -----
        For input vectors :math:`\mathbf{x}` and :math:`\mathbf{y}`, the polynomial
        kernel is:

        .. math::

            k(\mathbf{x}, \mathbf{y}) = (\gamma \mathbf{x}^\\top \mathbf{y} + c_0)^d

        In contrast to the linear kernel, the polynomial kernel also computes
        similarities *across* dimensions of the **x** and **y** vectors,
        allowing it to account for interactions between features.  As an
        instance of the dot product family of kernels, the polynomial kernel is
        invariant to a rotation of the coordinates about the origin, but *not*
        to translations.

        Parameters
        ----------
        d : int
            Degree of the polynomial kernel. Default is 3.
        gamma : float or None
            A scaling parameter for the dot product between `x` and `y`,
            determining the amount of smoothing/resonlution of the kernel.
            Larger values result in greater smoothing. If None, defaults to 1 /
            `C`.  Sometimes referred to as the kernel bandwidth.  Default is
            None.
        c0 : float
            Parameter trading off the influence of higher-order versus lower-order
            terms in the polynomial. If `c0` = 0, the kernel is said to be
            homogenous. Default is 1.
        """
        # 调用父类的初始化方法
        super().__init__()
        # 设置核函数的超参数字典
        self.hyperparameters = {"id": "PolynomialKernel"}
        # 设置核函数的参数字典
        self.parameters = {"d": d, "c0": c0, "gamma": gamma}
    # 定义一个私有方法,计算输入矩阵 X 和 Y 之间的度为 d 的多项式核
    def _kernel(self, X, Y=None):
        """
        Compute the degree-`d` polynomial kernel between all pairs of rows in `X`
        and `Y`.

        # 参数:
        # X:形状为 `(N, C)` 的 ndarray,包含 `N` 个输入向量
        # Y:形状为 `(M, C)` 的 ndarray 或 None
        #     包含 `M` 个输入向量。如果为 None,则假定 `Y = X`。默认为 None。

        # 返回:
        # out:形状为 `(N, M)` 的 ndarray
        #     `X` 和 `Y` 之间的相似度,其中索引 (`i`, `j`) 给出了 :math:`k(x_i, y_j)`(即核的格拉姆矩阵)。
        """
        # 获取模型参数
        P = self.parameters
        # 检查并处理输入矩阵 X 和 Y
        X, Y = kernel_checks(X, Y)
        # 计算 gamma 值,如果参数中未指定 gamma,则默认为 1 / X.shape[1]
        gamma = 1 / X.shape[1] if P["gamma"] is None else P["gamma"]
        # 计算多项式核的值并返回结果
        return (gamma * (X @ Y.T) + P["c0"]) ** P["d"]
# 定义 RBFKernel 类,继承自 KernelBase 类
class RBFKernel(KernelBase):
    # 初始化方法,接受一个 sigma 参数
    def __init__(self, sigma=None):
        """
        Radial basis function (RBF) / squared exponential kernel.

        Notes
        -----
        For input vectors :math:`\mathbf{x}` and :math:`\mathbf{y}`, the radial
        basis function kernel is:

        .. math::

            k(\mathbf{x}, \mathbf{y}) = \exp \left\{ -0.5
                \left\lVert \\frac{\mathbf{x} -
                    \mathbf{y}}{\sigma} \\right\\rVert_2^2 \\right\}

        The RBF kernel decreases with distance and ranges between zero (in the
        limit) to one (when **x** = **y**). Notably, the implied feature space
        of the kernel has an infinite number of dimensions.

        Parameters
        ----------
        sigma : float or array of shape `(C,)` or None
            A scaling parameter for the vectors **x** and **y**, producing an
            isotropic kernel if a float, or an anisotropic kernel if an array of
            length `C`.  Larger values result in higher resolution / greater
            smoothing. If None, defaults to :math:`\sqrt(C / 2)`. Sometimes
            referred to as the kernel 'bandwidth'. Default is None.
        """
        # 调用父类的初始化方法
        super().__init__()
        # 设置超参数字典
        self.hyperparameters = {"id": "RBFKernel"}
        # 设置参数字典,包含 sigma 参数
        self.parameters = {"sigma": sigma}
    # 计算输入向量 X 和 Y 之间的径向基函数(RBF)核
    def _kernel(self, X, Y=None):
        """
        Computes the radial basis function (RBF) kernel between all pairs of
        rows in `X` and `Y`.

        Parameters
        ----------
        X : :py:class:`ndarray <numpy.ndarray>` of shape `(N, C)`
            Collection of `N` input vectors, each with dimension `C`.
        Y : :py:class:`ndarray <numpy.ndarray>` of shape `(M, C)`
            Collection of `M` input vectors. If None, assume `Y` = `X`. Default
            is None.

        Returns
        -------
        out : :py:class:`ndarray <numpy.ndarray>` of shape `(N, M)`
            Similarity between `X` and `Y` where index (i, j) gives :math:`k(x_i, y_j)`.
        """
        # 获取参数字典
        P = self.parameters
        # 检查输入向量 X 和 Y,确保它们是合法的 ndarray
        X, Y = kernel_checks(X, Y)
        # 计算默认的 sigma 值,如果参数字允许的话
        sigma = np.sqrt(X.shape[1] / 2) if P["sigma"] is None else P["sigma"]
        # 计算核矩阵,使用高斯核函数
        return np.exp(-0.5 * pairwise_l2_distances(X / sigma, Y / sigma) ** 2)
class KernelInitializer(object):
    # 初始化学习率调度器的类,可以接受以下类型的输入:
    # (a) `KernelBase` 实例的 __str__ 表示
    # (b) `KernelBase` 实例
    # (c) 参数字典(例如,通过 `KernelBase` 实例的 `summary` 方法生成的)

    # 如果 `param` 为 None,则返回 `LinearKernel`
    def __init__(self, param=None):
        self.param = param

    def __call__(self):
        param = self.param
        if param is None:
            kernel = LinearKernel()
        elif isinstance(param, KernelBase):
            kernel = param
        elif isinstance(param, str):
            kernel = self.init_from_str()
        elif isinstance(param, dict):
            kernel = self.init_from_dict()
        return kernel

    def init_from_str(self):
        # 定义正则表达式,用于解析参数字符串
        r = r"([a-zA-Z0-9]*)=([^,)]*)"
        # 将参数字符串转换为小写
        kr_str = self.param.lower()
        # 解析参数字符串,生成参数字典
        kwargs = dict([(i, eval(j)) for (i, j) in re.findall(r, self.param)])

        if "linear" in kr_str:
            kernel = LinearKernel(**kwargs)
        elif "polynomial" in kr_str:
            kernel = PolynomialKernel(**kwargs)
        elif "rbf" in kr_str:
            kernel = RBFKernel(**kwargs)
        else:
            raise NotImplementedError("{}".format(kr_str))
        return kernel

    def init_from_dict(self):
        S = self.param
        # 获取超参数字典
        sc = S["hyperparameters"] if "hyperparameters" in S else None

        if sc is None:
            raise ValueError("Must have `hyperparameters` key: {}".format(S))

        if sc and sc["id"] == "LinearKernel":
            scheduler = LinearKernel().set_params(S)
        elif sc and sc["id"] == "PolynomialKernel":
            scheduler = PolynomialKernel().set_params(S)
        elif sc and sc["id"] == "RBFKernel":
            scheduler = RBFKernel().set_params(S)
        elif sc:
            raise NotImplementedError("{}".format(sc["id"]))
        return scheduler
# 检查输入数据 X 和 Y 的维度,如果 X 是一维数组,则将其转换为二维数组
X = X.reshape(-1, 1) if X.ndim == 1 else X
# 如果 Y 为 None,则将 Y 设置为 X
Y = X if Y is None else Y
# 检查 Y 的维度,如果 Y 是一维数组,则将其转换为二维数组
Y = Y.reshape(-1, 1) if Y.ndim == 1 else Y

# 断言 X 必须有两个维度
assert X.ndim == 2, "X must have 2 dimensions, but got {}".format(X.ndim)
# 断言 Y 必须有两个维度
assert Y.ndim == 2, "Y must have 2 dimensions, but got {}".format(Y.ndim)
# 断言 X 和 Y 必须有相同的列数
assert X.shape[1] == Y.shape[1], "X and Y must have the same number of columns"
# 返回经过检查和处理后的 X 和 Y
return X, Y


def pairwise_l2_distances(X, Y):
    """
    A fast, vectorized way to compute pairwise l2 distances between rows in `X`
    and `Y`.

    Notes
    -----
    An entry of the pairwise Euclidean distance matrix for two vectors is

    .. math::

        d[i, j]  &=  \sqrt{(x_i - y_i) @ (x_i - y_i)} \\\\
                 &=  \sqrt{sum (x_i - y_j)^2} \\\\
                 &=  \sqrt{sum (x_i)^2 - 2 x_i y_j + (y_j)^2}

    The code below computes the the third line using numpy broadcasting
    fanciness to avoid any for loops.

    Parameters
    ----------
    X : :py:class:`ndarray <numpy.ndarray>` of shape `(N, C)`
        Collection of `N` input vectors
    Y : :py:class:`ndarray <numpy.ndarray>` of shape `(M, C)`
        Collection of `M` input vectors. If None, assume `Y` = `X`. Default is
        None.

    Returns
    -------
    dists : :py:class:`ndarray <numpy.ndarray>` of shape `(N, M)`
        Pairwise distance matrix. Entry (i, j) contains the `L2` distance between
        :math:`x_i` and :math:`y_j`.
    """
    # 计算 X 和 Y 之间的 L2 距离矩阵
    D = -2 * X @ Y.T + np.sum(Y ** 2, axis=1) + np.sum(X ** 2, axis=1)[:, np.newaxis]
    # 将小于 0 的值裁剪为 0(由于数值精度问题可能导致小于 0 的值)
    D[D < 0] = 0
    # 返回计算得到的 L2 距离矩阵
    return np.sqrt(D)

numpy-ml\numpy_ml\utils\misc.py

# 导入 numpy 库
import numpy as np

# 重新定义 logsumexp 函数,计算对数概率的和
def logsumexp(log_probs, axis=None):
    # 计算输入数组中的最大值
    _max = np.max(log_probs)
    # 对数概率减去最大值,避免数值溢出
    ds = log_probs - _max
    # 计算指数和
    exp_sum = np.exp(ds).sum(axis=axis)
    # 返回对数概率的和
    return _max + np.log(exp_sum)

# 计算对数高斯概率密度函数 log N(x_i | mu, sigma)
def log_gaussian_pdf(x_i, mu, sigma):
    # 获取均值向量的长度
    n = len(mu)
    # 计算常数项 a
    a = n * np.log(2 * np.pi)
    # 计算矩阵的行列式的自然对数
    _, b = np.linalg.slogdet(sigma)

    # 计算 y = sigma^(-1) * (x_i - mu)
    y = np.linalg.solve(sigma, x_i - mu)
    # 计算 c = (x_i - mu)^T * y
    c = np.dot(x_i - mu, y)
    # 返回对数高斯概率密度函数的值
    return -0.5 * (a + b + c)

Utilities

The utilities module implements a number of useful functions and objects that
power other ML algorithms across the repo.

  • data_structures.py implements a few useful data structures

    • A max- and min-heap ordered priority queue
    • A ball tree with the KNS1 algorithm (Omohundro, 1989; Moore & Gray, 2006)
    • A discrete sampler implementing Vose’s algorithm for the alias method (Walker, 1977; Vose, 1991)
  • kernels.py implements several general-purpose similarity kernels

    • Linear kernel
    • Polynomial kernel
    • Radial basis function kernel
  • distance_metrics.py implements common distance metrics

    • Euclidean (L2) distance
    • Manhattan (L1) distance
    • Chebyshev (L-infinity) distance
    • Minkowski-p distance
    • Hamming distance
  • graphs.py implements simple data structures and algorithms for graph
    processing.

    • Undirected + directed graph objects allowing for probabilistic edge weights
    • Graph generators (Erdos-Renyi, random DAGs)
    • Topological sorting for DAGs
    • Cycle detection
    • Simple path-finding
  • windows.py implements several common windowing functions

    • Hann
    • Hamming
    • Blackman-Harris
    • Generalized cosine
  • testing.py implements helper functions that prove useful when writing unit
    tests, including data generators and various assert statements

numpy-ml\numpy_ml\utils\testing.py

# 导入必要的库
import numbers
import numpy as np

# 断言函数

# 检查数组 `X` 是否沿其主对角线对称
def is_symmetric(X):
    return np.allclose(X, X.T)

# 检查矩阵 `X` 是否对称且正定
def is_symmetric_positive_definite(X):
    if is_symmetric(X):
        try:
            # 如果矩阵对称,检查 Cholesky 分解是否存在(仅对对称/共轭正定矩阵定义)
            np.linalg.cholesky(X)
            return True
        except np.linalg.LinAlgError:
            return False
    return False

# 检查 `X` 是否包含沿列和为1的概率
def is_stochastic(X):
    msg = "Array should be stochastic along the columns"
    assert len(X[X < 0]) == len(X[X > 1]) == 0, msg
    assert np.allclose(np.sum(X, axis=1), np.ones(X.shape[0]), msg)
    return True

# 检查值 `a` 是否为数值型
def is_number(a):
    return isinstance(a, numbers.Number)

# 如果数组 `x` 是二进制数组且只有一个1,则返回True
def is_one_hot(x):
    msg = "Matrix should be one-hot binary"
    assert np.array_equal(x, x.astype(bool)), msg
    assert np.allclose(np.sum(x, axis=1), np.ones(x.shape[0]), msg)
    return True

# 如果数组 `x` 仅由二进制值组成,则返回True
def is_binary(x):
    msg = "Matrix must be binary"
    assert np.array_equal(x, x.astype(bool), msg)
    return True
# 创建一个形状为 (`n_examples`, `n_classes`) 的随机独热编码矩阵
def random_one_hot_matrix(n_examples, n_classes):
    # 创建一个单位矩阵,行数为类别数,列数为类别数,表示每个类别的独热编码
    X = np.eye(n_classes)
    # 从类别数中随机选择 n_examples 个类别的独热编码
    X = X[np.random.choice(n_classes, n_examples)]
    return X


# 创建一个形状为 (`n_examples`, `n_classes`) 的随机随机矩阵
def random_stochastic_matrix(n_examples, n_classes):
    # 创建一个形状为 (`n_examples`, `n_classes`) 的随机矩阵
    X = np.random.rand(n_examples, n_classes)
    # 对每一行进行归一化,使得每一行的和为1
    X /= X.sum(axis=1, keepdims=True)
    return X


# 创建一个形状为 `shape` 的随机实值张量。如果 `standardize` 为 True,则确保每列的均值为0,标准差为1
def random_tensor(shape, standardize=False):
    # 创建一个形状为 `shape` 的随机偏移量
    offset = np.random.randint(-300, 300, shape)
    # 创建一个形状为 `shape` 的随机矩阵,并加上偏移量
    X = np.random.rand(*shape) + offset

    if standardize:
        # 计算一个很小的数,用于避免除以0
        eps = np.finfo(float).eps
        # 对每列进行标准化,使得均值为0,标准差为1
        X = (X - X.mean(axis=0)) / (X.std(axis=0) + eps)
    return X


# 创建一个形状为 `shape` 的随机二值张量。`sparsity` 是一个控制输出张量中0和1比例的值
def random_binary_tensor(shape, sparsity=0.5):
    # 创建一个形状为 `shape` 的随机矩阵,大于等于 (1 - sparsity) 的值设为1,小于 (1 - sparsity) 的值设为0
    return (np.random.rand(*shape) >= (1 - sparsity)).astype(float)


# 生成一个由 `n_words` 个单词组成的随机段落。如果 `vocab` 不为 None,则从该列表中随机抽取单词;否则,从包含 26 个拉丁单词的集合中均匀抽取单词
    # 如果输入的词汇表为空,则使用默认的词汇表
    if vocab is None:
        vocab = [
            "at",
            "stet",
            "accusam",
            "aliquyam",
            "clita",
            "lorem",
            "ipsum",
            "dolor",
            "dolore",
            "dolores",
            "sit",
            "amet",
            "consetetur",
            "sadipscing",
            "elitr",
            "sed",
            "diam",
            "nonumy",
            "eirmod",
            "duo",
            "ea",
            "eos",
            "erat",
            "est",
            "et",
            "gubergren",
        ]
    # 返回一个包含 n_words 个随机词汇的列表
    return [np.random.choice(vocab) for _ in range(n_words)]
# 自定义警告类,继承自 RuntimeWarning
class DependencyWarning(RuntimeWarning):
    pass

numpy-ml\numpy_ml\utils\windows.py

import numpy as np  # 导入 NumPy 库


def blackman_harris(window_len, symmetric=False):
    """
    The Blackman-Harris window.

    Notes
    -----
    The Blackman-Harris window is an instance of the more general class of
    cosine-sum windows where `K=3`. Additional coefficients extend the Hamming
    window to further minimize the magnitude of the nearest side-lobe in the
    frequency response.

    .. math::
        \\text{bh}(n) = a_0 - a_1 \cos\left(\\frac{2 \pi n}{N}\\right) +
            a_2 \cos\left(\\frac{4 \pi n }{N}\\right) -
                a_3 \cos\left(\\frac{6 \pi n}{N}\\right)

    where `N` = `window_len` - 1, :math:`a_0` = 0.35875, :math:`a_1` = 0.48829,
    :math:`a_2` = 0.14128, and :math:`a_3` = 0.01168.

    Parameters
    ----------
    window_len : int
        The length of the window in samples. Should be equal to the
        `frame_width` if applying to a windowed signal.
    symmetric : bool
        If False, create a 'periodic' window that can be used in with an FFT /
        in spectral analysis.  If True, generate a symmetric window that can be
        used in, e.g., filter design. Default is False.

    Returns
    -------
    window : :py:class:`ndarray <numpy.ndarray>` of shape `(window_len,)`
        The window
    """
    return generalized_cosine(  # 调用 generalized_cosine 函数
        window_len, [0.35875, 0.48829, 0.14128, 0.01168], symmetric
    )


def hamming(window_len, symmetric=False):
    """
    The Hamming window.

    Notes
    -----
    The Hamming window is an instance of the more general class of cosine-sum
    windows where `K=1` and :math:`a_0 = 0.54`. Coefficients selected to
    minimize the magnitude of the nearest side-lobe in the frequency response.

    .. math::

        \\text{hamming}(n) = 0.54 -
            0.46 \cos\left(\\frac{2 \pi n}{\\text{window_len} - 1}\\right)

    Parameters
    ----------
    window_len : int
        The length of the window in samples. Should be equal to the
        `frame_width` if applying to a windowed signal.
    symmetric : bool
        # 定义一个布尔型参数 symmetric,用于指定是否生成对称窗口
        If False, create a 'periodic' window that can be used in with an FFT /
        in spectral analysis.  If True, generate a symmetric window that can be
        used in, e.g., filter design. Default is False.
        # 如果 symmetric 为 False,则创建一个周期性窗口,可用于 FFT 或频谱分析;如果为 True,则生成对称窗口,可用于滤波器设计,默认为 False

    Returns
    -------
    window : :py:class:`ndarray <numpy.ndarray>` of shape `(window_len,)`
        # 返回值为一个形状为 (window_len,) 的 ndarray 类型的窗口数组
        The window
    """
    # 调用 generalized_cosine 函数,传入窗口长度和对应的参数列表 [0.54, 1 - 0.54],以及 symmetric 参数
    return generalized_cosine(window_len, [0.54, 1 - 0.54], symmetric)
# 定义汉宁窗口函数
def hann(window_len, symmetric=False):
    """
    The Hann window.

    Notes
    -----
    汉宁窗口是余弦和窗口的一个特例,其中 `K=1` 和 :math:`a_0` = 0.5。与 Hamming 窗口不同,汉宁窗口的端点接触到零。

    .. math::

        \\text{hann}(n) = 0.5 - 0.5 \cos\left(\\frac{2 \pi n}{\\text{window_len} - 1}\\right)

    Parameters
    ----------
    window_len : int
        窗口的长度(以样本为单位)。如果应用于窗口信号,则应等于 `frame_width`。
    symmetric : bool
        如果为 False,则创建一个可以在 FFT / 频谱分析中使用的“周期性”窗口。如果为 True,则生成一个可以在滤波器设计等方面使用的对称窗口。默认为 False。

    Returns
    -------
    window : :py:class:`ndarray <numpy.ndarray>` of shape `(window_len,)`
        窗口
    """
    return generalized_cosine(window_len, [0.5, 0.5], symmetric)


# 定义广义余弦窗口函数
def generalized_cosine(window_len, coefs, symmetric=False):
    """
    The generalized cosine family of window functions.

    Notes
    -----
    广义余弦窗口是余弦项的简单加权和。

    对于 :math:`n \in \{0, \ldots, \\text{window_len} \}`:

    .. math::

        \\text{GCW}(n) = \sum_{k=0}^K (-1)^k a_k \cos\left(\\frac{2 \pi k n}{\\text{window_len}}\\right)

    Parameters
    ----------
    window_len : int
        窗口的长度(以样本为单位)。如果应用于窗口信号,则应等于 `frame_width`。
    coefs: list of floats
        :math:`a_k` 系数值
    symmetric : bool
        如果为 False,则创建一个可以在 FFT / 频谱分析中使用的“周期性”窗口。如果为 True,则生成一个可以在滤波器设计等方面使用的对称窗口。默认为 False.

    Returns
    -------

    """
    window : :py:class:`ndarray <numpy.ndarray>` of shape `(window_len,)`
        The window
    """
    # 如果不是对称窗口,窗口长度加1
    window_len += 1 if not symmetric else 0
    # 生成等间距的数组,范围为 -π 到 π,长度为 window_len
    entries = np.linspace(-np.pi, np.pi, window_len)  # (-1)^k * 2pi*n / window_len
    # 根据给定的系数生成窗口
    window = np.sum([ak * np.cos(k * entries) for k, ak in enumerate(coefs)], axis=0)
    # 如果不是对称窗口,去掉最后一个元素
    return window[:-1] if not symmetric else window
# 定义一个 WindowInitializer 类
class WindowInitializer:
    # 定义 __call__ 方法,用于初始化窗口函数
    def __call__(self, window):
        # 如果窗口函数为 hamming,则返回 hamming 函数
        if window == "hamming":
            return hamming
        # 如果窗口函数为 blackman_harris,则返回 blackman_harris 函数
        elif window == "blackman_harris":
            return blackman_harris
        # 如果窗口函数为 hann,则返回 hann 函数
        elif window == "hann":
            return hann
        # 如果窗口函数为 generalized_cosine,则返回 generalized_cosine 函数
        elif window == "generalized_cosine":
            return generalized_cosine
        # 如果窗口函数不在以上几种情况中,则抛出 NotImplementedError 异常
        else:
            raise NotImplementedError("{}".format(window))

numpy-ml\numpy_ml\utils\__init__.py

# 导入测试模块
from . import testing
# 导入数据结构模块
from . import data_structures
# 导入距离度量模块
from . import distance_metrics
# 导入核函数模块
from . import kernels
# 导入窗口函数模块
from . import windows
# 导入图模块
from . import graphs
# 导入杂项模块
from . import misc

numpy-ml\numpy_ml\__init__.py

# noqa
"""Common ML and ML-adjacent algorithms implemented in NumPy"""

# 导入自定义的工具模块
from . import utils
# 导入数据预处理模块
from . import preprocessing

# 导入高斯混合模型算法模块
from . import gmm
# 导入隐马尔可夫模型算法模块
from . import hmm
# 导入线性判别分析算法模块
from . import lda
# 导入线性模型算法模块
from . import linear_models
# 导入神经网络算法模块
from . import neural_nets
# 导入 n 元语法模块
from . import ngram
# 导入非参数模型算法模块
from . import nonparametric
# 导入强化学习模型算法模块
from . import rl_models
# 导入树模型算法模块
from . import trees
# 导入赌博机算法模块
from . import bandits
# 导入因子分解算法模块
from . import factorization

numpy-ml\setup.py

# 禁用 flake8 检查
from codecs import open
# 导入 setup 和 find_packages 函数
from setuptools import setup, find_packages

# 读取 README.md 文件内容作为长描述
with open("README.md", encoding="utf-8") as f:
    LONG_DESCRIPTION = f.read()

# 读取 requirements.txt 文件内容,去除空行后作为依赖列表
with open("requirements.txt") as requirements:
    REQUIREMENTS = [r.strip() for r in requirements if r != "\n"]

# 定义项目相关链接
PROJECT_URLS = {
    "Bug Tracker": "https://github.com/ddbourgin/numpy-ml/issues",
    "Documentation": "https://numpy-ml.readthedocs.io/en/latest/",
    "Source": "https://github.com/ddbourgin/numpy-ml",
}

# 设置项目信息
setup(
    name="numpy-ml",
    version="0.1.2",
    author="David Bourgin",
    author_email="ddbourgin@gmail.com",
    project_urls=PROJECT_URLS,
    url="https://github.com/ddbourgin/numpy-ml",
    description="Machine learning in NumPy",
    long_description=LONG_DESCRIPTION,
    long_description_content_type="text/markdown",
    install_requires=REQUIREMENTS,
    packages=find_packages(),
    license="GPLv3+",
    include_package_data=True,
    python_requires=">=3.5",
    extras_require={"rl": ["gym", "matplotlib"]},
    classifiers=[
        "Development Status :: 3 - Alpha",
        "Intended Audience :: Science/Research",
        "Intended Audience :: Developers",
        "Topic :: Scientific/Engineering",
        "License :: OSI Approved :: GNU General Public License (GPL)",
        "Operating System :: OS Independent",
        "Programming Language :: Python :: 3",
    ],
)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/390215.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

java8新特性——StreamAPI

说明&#xff1a; java8中有两大最为重要的改变。第一个是Lambda表达式&#xff1b;另外一个则是Stream API。 Stream API&#xff08;java.util.stream&#xff09;把真正的函数式编程风格引入java。这是目前为止对java类库最好的补充&#xff0c;因为Stream API可以极大提供j…

基于微信小程序的在线课堂的研究与实现,附源码

博主介绍&#xff1a;✌程序员徐师兄、7年大厂程序员经历。全网粉丝30W、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;…

Codeforces Round 926 (Div. 2) B. Sasha and the Drawing (Java)

Codeforces Round 926 (Div. 2) B. Sasha and the Drawing (Java) 比赛链接&#xff1a;Codeforces Round 926 (Div. 2) B题传送门&#xff1a;B. Sasha and the Drawing 题目&#xff1a;B. Sasha and the Drawing Example input 3 4 3 3 3 10 3 9 4 7 7 11 2 3output 2 …

VMware Workstation 17.0 虚拟机安装MS-DOS 7.1完整详细步骤图文教程

VMware Workstation 17.0 虚拟机安装MS-DOS 7.1完整详细步骤图文教程 一、配置MS-DOS虚拟机机器环境二、安装MS-DOS磁盘操作系统 一、配置MS-DOS虚拟机机器环境 1.打开VMware Workstation Pro 2.新建虚拟机 3.建议选择【典型】&#xff0c;之后点击【下一步】 关于【自定义…

前端秘法基础式(CSS)(第一卷)

一.认识CSS CSS 指的是层叠样式表&#xff08;Cascading Style Sheets&#xff09;&#xff0c;它是一种用于描述网页外观和布局的语法 CSS 可以定义网页中元素的字体、颜色、大小、位置、背景等样式&#xff0c;使网页具有美观的外观和统 一的风格。 通过将 CSS 样式表与 HTML…

Qt Creator 继承分文件编写代码流程实现简单案列

Qt Creator 继承分文件流程实现简单案列 打开Qt Creator&#xff0c;新建c项目 添加类 完成之后&#xff0c;会自动生成.h和.cpp文件 一、animal.h文件 主要用来写类&#xff0c;包括成员变量和函数 #ifndef ANIMAL_H #define ANIMAL_H #include <iostream> #inclu…

C#上位机与三菱PLC的通信04--MC协议之A-1E报文测试

到目前为止&#xff0c;还没有网上有哪个文章有我如此的报文分析&#xff0c;操作实例&#xff0c;一大批都是抄来抄去&#xff0c;没有截图&#xff0c;没有说明&#xff0c;没有实例&#xff0c;有卵用呀&#xff0c;仅以此文章献给最爱的粉丝&#xff0c;希望对各位大师有些…

vue3 之 商城项目—结算模块

路由配置 chekout/index.vue <script setup> const checkInfo {} // 订单对象 const curAddress {} // 地址对象 </script> <template><div class"xtx-pay-checkout-page"><div class"container"><div class"w…

lenovo联想小新 Pro14 2022 Intel版IAH5R,AMD版ARH5R(82UT),(82UU)原装出厂Win11系统带恢复重置功能

联想小新Pro14笔记本电脑恢复原厂OEM预装Windows11系统镜像包&#xff0c;开箱状态时一模一样 Intel版(82UT)链接&#xff1a;https://pan.baidu.com/s/1jexQXkC6TerUnDQffwLooA?pwdcc09 提取码&#xff1a;cc09 AMD锐龙版(82UU)链接&#xff1a;https://pan.baidu.com/s/…

【FPGA开发】HDMI通信协议解析及FPGA实现

本篇文章包含的内容 一、HDMI简介1.1 HDMI引脚解析1.2 HDMI工作原理1.3 DVI编码1.4 TMDS编码 二、并串转换、单端差分转换原语2.1 原语简介2.2 IO端口组件 笔者在这里使用的开发板是正点原子的达芬奇开发板&#xff0c;FPGA型号为XC7A35TFGG484-2。参考的课程是正点原子的课程手…

Calendar的使用(Java)

直接从需求来理解&#xff1a;将2024年2月16日增加一个月 如果不使用Calendar的话&#xff0c;我们需要定义字符串记住这个日期&#xff0c;然后把字符串解析成Date日期对象&#xff0c;通过Date日期对象获取其毫秒值&#xff0c;然后增加一个月的毫秒值&#xff0c;再格式化时…

蓝桥杯第十四届电子类单片机组程序设计

目录 前言 蓝桥杯大赛历届真题&#xff08;点击查看&#xff09; 一、第十四届比赛题目 1.比赛原题 2.题目解读 1&#xff09;任务要求 2&#xff09;注意事项 二、任务实现 1.NE555读取时机的问题 1&#xff09;缩短计数时间 2&#xff09;实时读取 2.温度传感器读…

社区养老|社区养老服务系统|基于springboot社区养老服务系统设计与实现(源码+数据库+文档)

社区养老服务系统目录 目录 基于springboot社区养老服务系统设计与实现 一、前言 二、系统功能设计 三、系统实现 1、管理员部分功能 &#xff08;1&#xff09; 用户管理 &#xff08;2&#xff09;服务种类管理 &#xff08;3&#xff09;社区服务管理 &#xff08…

MySQL 基础知识(八)之用户权限管理

目录 1 MySQL 权限管理概念 2 用户管理 2.1 创建用户 2.2 查看当前登录用户 2.3 修改用户名 2.4 删除用户 3 授予权限 3.1 授予用户管理员权限 3.2 授予用户数据库权限 3.3 授予用户表权限 3.4 授予用户列权限 4 查询权限 5 回收权限 1 MySQL 权限管理概念 关于 M…

NodeJS背后的人:Express

NodeJS背后的人&#xff1a;Express 本篇文章&#xff0c;学习记录于&#xff1a;尚硅谷&#x1f3a2; 文章简单学习总结&#xff1a;如有错误 大佬 &#x1f449;点. 前置知识&#xff1a;需要掌握了解&#xff1a; JavaScript基础语法 、Node.JS环境API 、前端工程\模块化 …

ESP32学习(3)——连接WIFI

1.简介 Wi-Fi是基于IEEE 802.11标准的无线网络技术 让联网设备以无线电波的形式&#xff0c;加入采用TCP/IP通信协议的网络. Wi-Fi设备有两种模式&#xff1a; 1.Access Point(AP) 模式&#xff0c;此为无线接入点&#xff0c;家里的光猫就是结合WiFi和internet路由功能的AP。…

Javaweb之SpringBootWeb案例之AOP通知类型的详细解析

3.1 通知类型 在入门程序当中&#xff0c;我们已经使用了一种功能最为强大的通知类型&#xff1a;Around环绕通知。 Around("execution(* com.itheima.service.*.*(..))") public Object recordTime(ProceedingJoinPoint pjp) throws Throwable {//记录方法执行开始…

I/O并行口直接驱动LED显示

1&#xff0e;  实验任务 如图所示&#xff0c;利用AT89S51单片机的P0端口的P0.0&#xff0d;P0.7连接到一个共阴数码管的a&#xff0d;h的笔段上&#xff0c;数码管的公共端接地。在数码管上循环显示0&#xff0d;9数字&#xff0c;时间间隔0.2秒。 2&#xff0e;  电路原…

第7章 Page449~451 7.8.9智能指针 std::shared_ptr

“shared_ptr”是“共享式智能指针”。 即多个“shared_ptr”之间可以管理同一个裸指针。于是 O* o new O; //一个裸指针 std::shared_ptr <O> p1(o); //交给p1管 std::shared_ptr <O> p2(o); //又交给p2管 出乎意料&#xff0c;以上代码仍然是可以通过编译但运…

IO流---缓冲流,转换流,打印流,序列化流

缓冲流 缓冲流&#xff08;Buffered Stream&#xff09;也被称为高效流&#xff0c;它是对基本的字节字符流进行增强的一种流。通过缓冲流&#xff0c;可以提高数据的读写能力。 在创建缓冲流对象时&#xff0c;会创建一个内置的默认大小的缓冲区数组。通过对缓冲区的读写&…
最新文章