# -*- coding: utf-8 -*-
# MegEngine is Licensed under the Apache License, Version 2.0 (the "License")
#
# Copyright (c) 2014-2021 Megvii Inc. All rights reserved.
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
from typing import Iterable, Optional, Tuple, Union
from ..core._imperative_rt.core2 import apply
from ..core.ops import builtin
from ..core.tensor import megbrain_graph, utils
from ..core.tensor.utils import astensor1d
from ..jit.tracing import is_tracing
from ..tensor import Tensor
from .elemwise import floor
from .math import argsort
from .tensor import broadcast_to, concat, expand_dims, reshape
[文档]def cvt_color(inp: Tensor, mode: str = ""):
r"""
Convert images from one format to another
:param inp: input images.
:param mode: format mode.
:return: convert result.
Examples:
.. testcode::
import numpy as np
import megengine as mge
import megengine.functional as F
x = mge.tensor(np.array([[[[-0.58675045, 1.7526233, 0.10702174]]]]).astype(np.float32))
y = F.vision.cvt_color(x, mode="RGB2GRAY")
print(y.numpy())
Outputs:
.. testoutput::
[[[[0.86555195]]]]
"""
assert mode in builtin.CvtColor.Mode.__dict__, "unspport mode for cvt_color"
mode = getattr(builtin.CvtColor.Mode, mode)
assert isinstance(mode, builtin.CvtColor.Mode)
op = builtin.CvtColor(mode=mode)
(out,) = apply(op, inp)
return out
[文档]def roi_pooling(
inp: Tensor,
rois: Tensor,
output_shape: Union[int, tuple, list],
mode: str = "max",
scale: float = 1.0,
) -> Tensor:
"""
Applies roi pooling on input feature.
:param inp: tensor that represents the input feature, `(N, C, H, W)` images.
:param rois: `(K, 5)` boxes. First column is the index into N. The other 4 columns are xyxy.
:param output_shape: `(height, width)` of output rois feature.
:param mode: "max" or "average", use max/average align just like max/average pooling. Default: "max"
:param scale: scale the input boxes by this number. Default: 1.0
:return: `(K, C, output_shape[0], output_shape[1])` feature of rois.
Examples:
.. testcode::
import numpy as np
from megengine import tensor
import megengine.functional as F
np.random.seed(42)
inp = tensor(np.random.randn(1, 1, 128, 128))
rois = tensor(np.random.random((4, 5)))
y = F.vision.roi_pooling(inp, rois, (2, 2))
print(y.numpy()[0].round(decimals=4))
Outputs:
.. testoutput::
[[[-0.1383 -0.1383]
[-0.5035 -0.5035]]]
"""
assert mode in ["max", "average"], "only max/average mode is supported"
if isinstance(output_shape, int):
output_shape = (output_shape, output_shape)
op = builtin.ROIPooling(mode=mode, scale=scale)
inp, rois = utils.convert_inputs(inp, rois)
result, _ = apply(
op, inp, rois, Tensor(output_shape, dtype="int32", device=inp.device)
)
return result
[文档]def roi_align(
inp: Tensor,
rois: Tensor,
output_shape: Union[int, tuple, list],
mode: str = "average",
spatial_scale: float = 1.0,
sample_points: Union[int, tuple, list] = 2,
aligned: bool = True,
) -> Tensor:
"""
Applies roi align on input feature.
:param inp: tensor that represents the input feature, shape is `(N, C, H, W)`.
:param rois: `(N, 5)` boxes. First column is the box index. The other 4 columns are ``xyxy``.
:param output_shape: `(height, width)` shape of output rois feature.
:param mode: "max" or "average", use max/average align just like max/average pooling. Default: "average"
:param spatial_scale: scale the input boxes by this number. Default: 1.0
:param sample_points: number of inputs samples to take for each output sample.
0 to take samples densely. Default: 2
:param aligned: wheather to align the input feature, with `aligned=True`,
we first appropriately scale the ROI and then shift it by -0.5. Default: True
:return: output tensor.
Examples:
.. testcode::
import numpy as np
from megengine import tensor
import megengine.functional as F
np.random.seed(42)
inp = tensor(np.random.randn(1, 1, 128, 128))
rois = tensor(np.random.random((4, 5)))
y = F.vision.roi_align(inp, rois, (2, 2))
print(y.numpy()[0].round(decimals=4))
Outputs:
.. testoutput::
[[[0.175 0.175 ]
[0.1359 0.1359]]]
"""
assert mode in ["max", "average"], "only max/average mode is supported"
if isinstance(output_shape, int):
output_shape = (output_shape, output_shape)
pooled_height, pooled_width = output_shape
if isinstance(sample_points, int):
sample_points = (sample_points, sample_points)
sample_height, sample_width = sample_points
offset = 0.5 if aligned else 0.0
op = builtin.ROIAlign(
mode=mode,
format="NCHW",
spatial_scale=spatial_scale,
offset=offset,
pooled_height=pooled_height,
pooled_width=pooled_width,
sample_height=sample_height,
sample_width=sample_width,
)
inp, rois = utils.convert_inputs(inp, rois)
result, *_ = apply(op, inp, rois)
return result
[文档]def nms(
boxes: Tensor, scores: Tensor, iou_thresh: float, max_output: Optional[int] = None
) -> Tensor:
r"""
Performs non-maximum suppression (NMS) on the boxes according to their intersection-over-union(IoU).
:param boxes: tensor of shape `(N, 4)`; the boxes to perform nms on; each box is expected to be in `(x1, y1, x2, y2)` format.
:param iou_thresh: IoU threshold for overlapping.
:param scores: tensor of shape `(N,)`, the score of boxes.
:param max_output: the maximum number of boxes to keep; it is optional if this operator is not traced
otherwise it required to be specified; if it is not specified, all boxes are kept.
:return: indices of the elements that have been kept by NMS.
Examples:
.. testcode::
import numpy as np
from megengine import tensor
import megengine.functional as F
x = np.zeros((100,4))
np.random.seed(42)
x[:,:2] = np.random.rand(100,2)*20
x[:,2:] = np.random.rand(100,2)*20 + 100
scores = tensor(np.random.rand(100))
inp = tensor(x)
result = F.vision.nms(inp, scores, iou_thresh=0.7)
print(result.numpy())
Outputs:
.. testoutput::
[75 69]
"""
assert (
boxes.ndim == 2 and boxes.shape[1] == 4
), "the expected shape of boxes is (N, 4)"
assert scores.ndim == 1, "the expected shape of scores is (N,)"
assert (
boxes.shape[0] == scores.shape[0]
), "number of boxes and scores are not matched"
boxes = boxes.detach()
scores = scores.detach()
sorted_idx = argsort(scores, descending=True)
boxes = boxes[sorted_idx]
if is_tracing():
assert (
max_output is not None and max_output > 0
), "max_output should be specified under tracing"
if max_output is None:
max_output = boxes.shape[0]
op = builtin.NMSKeep(iou_thresh, max_output)
inp = utils.convert_inputs(boxes.reshape(1, -1, 4))
indices, count = apply(op, *inp)
indices = indices[0][: count[0]]
keep_inds = sorted_idx[indices]
return keep_inds
[文档]def remap(
inp: Tensor,
map_xy: Tensor,
border_mode: str = "REPLICATE",
scalar: float = 0.0,
interp_mode: str = "LINEAR",
) -> Tensor:
r"""
Applies remap transformation to batched 2D images.
The input images are transformed to the output images by the tensor map_xy.
The output's H and W are same as map_xy's H and W.
:param inp: input image
:param map_xy: (batch, oh, ow, 2) transformation matrix
:param border_mode: pixel extrapolation method.
Default: "REPLICATE". Currently also support "CONSTANT", "REFLECT",
"REFLECT_101", "WRAP".
:param scalar: value used in case of a constant border. Default: 0
:param interp_mode: interpolation methods.
Default: "LINEAR". Currently only support "LINEAR" mode.
:return: output tensor.
Examples:
.. testcode::
import numpy as np
from megengine import tensor
import megengine.functional as F
inp_shape = (1, 1, 4, 4)
inp = tensor(np.arange(16, dtype=np.float32).reshape(inp_shape))
map_xy_shape = (1, 2, 2, 2)
map_xy = tensor(np.array([[[1., 0.],[0., 1.]],
[[0., 1.],[0., 1.]]],
dtype=np.float32).reshape(map_xy_shape))
out = F.vision.remap(inp, map_xy)
print(out.numpy())
Outputs:
.. testoutput::
[[[[1. 4.]
[4. 4.]]]]
"""
op = builtin.Remap(
imode=interp_mode, border_type=border_mode, format="NCHW", scalar=scalar
)
assert isinstance(inp, (Tensor, megbrain_graph.VarNode)), "inp must be Tensor type"
(result,) = apply(op, inp, map_xy)
return result
[文档]def warp_affine(
inp: Tensor,
weight: Tensor,
out_shape,
border_mode="REPLICATE",
border_val=0,
format="NHWC",
imode="LINEAR",
):
"""
Batched affine transform on 2D images.
:param inp: input image.
:param weight: weight tensor.
:param out_shape: output tensor shape.
:param border_mode: pixel extrapolation method.
Default: "WRAP". Currently "CONSTANT", "REFLECT",
"REFLECT_101", "ISOLATED", "WRAP", "REPLICATE", "TRANSPARENT" are supported.
:param border_val: value used in case of a constant border. Default: 0
:param format: "NHWC" as default based on historical concerns,
"NCHW" is also supported. Default: "NCHW".
:param imode: interpolation methods. Could be "LINEAR", "NEAREST", "CUBIC", "AREA".
Default: "LINEAR".
:return: output tensor.
.. note::
Here all available options for params are listed,
however it does not mean that you can use all the combinations.
On different platforms, different combinations are supported.
"""
op = builtin.WarpAffine(
border_mode=border_mode, border_val=border_val, format=format, imode=imode
)
out_shape = utils.astensor1d(out_shape, inp, dtype="int32", device=inp.device)
(result,) = apply(op, inp, weight, out_shape)
return result
def warp_perspective(
inp: Tensor,
M: Tensor,
dsize: Union[Tuple[int, int], int, Tensor],
border_mode: str = "REPLICATE",
border_val: float = 0.0,
interp_mode: str = "LINEAR",
) -> Tensor:
r"""
Applies perspective transformation to batched 2D images.
The input images are transformed to the output images by the transformation matrix:
.. math::
\text{output}(n, c, h, w) = \text{input} \left( n, c,
\frac{M_{00}h + M_{01}w + M_{02}}{M_{20}h + M_{21}w + M_{22}},
\frac{M_{10}h + M_{11}w + M_{12}}{M_{20}h + M_{21}w + M_{22}}
\right)
:param inp: input image.
:param M: `(batch, 3, 3)` transformation matrix.
:param dsize: `(h, w)` size of the output image.
:param border_mode: pixel extrapolation method.
Default: "REPLICATE". Currently also support "CONSTANT", "REFLECT",
"REFLECT_101", "WRAP".
:param border_val: value used in case of a constant border. Default: 0
:param interp_mode: interpolation methods.
Default: "LINEAR". Currently only support "LINEAR" mode.
:return: output tensor.
Note:
The transformation matrix is the inverse of that used by `cv2.warpPerspective`.
Examples:
.. testcode::
import numpy as np
from megengine import tensor
import megengine.functional as F
inp_shape = (1, 1, 4, 4)
x = tensor(np.arange(16, dtype=np.float32).reshape(inp_shape))
M_shape = (1, 3, 3)
# M defines a translation: dst(1, 1, h, w) = rst(1, 1, h+1, w+1)
M = tensor(np.array([[1., 0., 1.],
[0., 1., 1.],
[0., 0., 1.]], dtype=np.float32).reshape(M_shape))
out = F.vision.warp_perspective(x, M, (2, 2))
print(out.numpy())
Outputs:
.. testoutput::
[[[[ 5. 6.]
[ 9. 10.]]]]
"""
op = builtin.WarpPerspective(
imode=interp_mode, bmode=border_mode, format="NCHW", border_val=border_val
)
inp, M = utils.convert_inputs(inp, M)
dsize = astensor1d(dsize, inp, dtype="int32", device=inp.device)
(result,) = apply(op, inp, M, dsize)
return result
[文档]def interpolate(
inp: Tensor,
size: Optional[Union[int, Tuple[int, int]]] = None,
scale_factor: Optional[Union[float, Tuple[float, float]]] = None,
mode: str = "BILINEAR",
align_corners: Optional[bool] = None,
) -> Tensor:
r"""
Down/up samples the input tensor to either the given size or with the given scale_factor. ``size`` can not coexist with ``scale_factor``.
:param inp: input tensor.
:param size: size of the output tensor. Default: None
:param scale_factor: scaling factor of the output tensor. Default: None
:param mode: interpolation methods, acceptable values are:
"BILINEAR", "LINEAR". Default: "BILINEAR"
:param align_corners: This only has an effect when `mode`
is "BILINEAR" or "LINEAR". Geometrically, we consider the pixels of the input
and output as squares rather than points. If set to ``True``, the input
and output tensors are aligned by the center points of their corner
pixels, preserving the values at the corner pixels. If set to ``False``,
the input and output tensors are aligned by the corner points of their
corner pixels, and the interpolation uses edge value padding for
out-of-boundary values, making this operation *independent* of input size
:return: output tensor.
Examples:
.. testcode::
import numpy as np
from megengine import tensor
import megengine.functional as F
x = tensor(np.arange(1, 5, dtype=np.float32).reshape(1, 1, 2, 2))
out = F.vision.interpolate(x, [4, 4], align_corners=False)
print(out.numpy())
out2 = F.vision.interpolate(x, scale_factor=2.)
np.testing.assert_allclose(out.numpy(), out2.numpy())
Outputs:
.. testoutput::
[[[[1. 1.25 1.75 2. ]
[1.5 1.75 2.25 2.5 ]
[2.5 2.75 3.25 3.5 ]
[3. 3.25 3.75 4. ]]]]
"""
mode = mode.upper()
if mode not in ["BILINEAR", "LINEAR"]:
raise ValueError("interpolate only support linear or bilinear mode")
if mode not in ["BILINEAR", "LINEAR"]:
if align_corners is not None:
raise ValueError(
"align_corners option can only be set in the bilinear/linear interpolating mode"
)
else:
if align_corners is None:
align_corners = False
if (
size is not None
and scale_factor is None
and not align_corners
and mode == "BILINEAR"
and inp.ndim in [4, 5]
):
# fastpath for interpolate
op = builtin.Resize(imode="LINEAR", format="NCHW")
shape = astensor1d(size, inp, dtype="int32", device=inp.device)
(result,) = apply(op, inp, shape)
return result
if mode == "LINEAR":
inp = expand_dims(inp, 3)
if inp.ndim != 4:
raise ValueError("shape of input tensor must correspond to the operartion mode")
if size is None:
if scale_factor is None:
raise ValueError("scale_factor must not be None when size is None")
if isinstance(scale_factor, (float, int)):
scale_factor = float(scale_factor)
if mode == "LINEAR":
scale_factor = (scale_factor, float(1))
else:
scale_factor = (scale_factor, scale_factor)
else:
if mode == "LINEAR":
raise ValueError(
"under LINEAR mode, scale_factor can only be single value"
)
assert len(scale_factor) == 2, "shape of scale_factor must be equal to (2, )"
assert isinstance(scale_factor[0], float) and isinstance(
scale_factor[1], float
), "scale_factor must be float type"
dsize = tuple(
floor(
Tensor(
inp.shape[i + 2] * scale_factor[i],
dtype="float32",
device=inp.device,
)
)
for i in range(2)
)
dsize = concat([dsize[0], dsize[1]], axis=0)
else:
if scale_factor is not None:
raise ValueError("scale_factor must be None when size is provided")
if isinstance(size, int):
size = (size, 1)
else:
if mode == "LINEAR":
raise ValueError("under LINEAR mode, size can only be single value")
dsize = size
oh, ow = dsize[0], dsize[1]
ih, iw = inp.shape[2], inp.shape[3]
if align_corners:
hscale = (ih - 1.0) / (oh - 1.0)
wscale = 1.0 * iw / ow
if mode != "LINEAR":
wscale = (iw - 1.0) / (ow - 1.0)
row0 = concat(
[wscale, Tensor([0, 0], dtype="float32", device=inp.device)], axis=0
).reshape(1, 3)
row1 = concat(
[
Tensor(0, dtype="float32", device=inp.device),
hscale,
Tensor(0, dtype="float32", device=inp.device),
],
axis=0,
).reshape(1, 3)
weight = concat(
[row0, row1, Tensor([[0, 0, 1]], dtype="float32", device=inp.device)],
axis=0,
).reshape(1, 3, 3)
weight = broadcast_to(weight, (inp.shape[0], 3, 3))
else:
hscale = 1.0 * ih / oh
wscale = 1.0 * iw / ow
row0 = concat(
[wscale, Tensor(0, dtype="float32", device=inp.device), 0.5 * wscale - 0.5],
axis=0,
).reshape(1, 3)
row1 = concat(
[Tensor(0, dtype="float32", device=inp.device), hscale, 0.5 * hscale - 0.5],
axis=0,
).reshape(1, 3)
weight = concat(
[row0, row1, Tensor([[0, 0, 1]], dtype="float32", device=inp.device)],
axis=0,
).reshape(1, 3, 3)
weight = broadcast_to(weight, (inp.shape[0], 3, 3))
weight = weight.astype("float32")
ret = warp_perspective(inp, weight, dsize, interp_mode="LINEAR")
if mode == "LINEAR":
ret = reshape(ret, ret.shape[0:3])
return ret