跳到主要内容

pandas.merge()方法源码阅读笔记

· 阅读需 12 分钟

pandas.merge()方法

pandas/core/reshape/merge.py#L45-L54

@Substitution('\nleft : DataFrame')
@Appender(_merge_doc, indents=0)
def merge(left, right, how='inner', on=None, left_on=None, right_on=None,
left_index=False, right_index=False, sort=False,
suffixes=('_x', '_y'), copy=True, indicator=False):
op = _MergeOperation(left, right, how=how, on=on, left_on=left_on,
right_on=right_on, left_index=left_index,
right_index=right_index, sort=sort, suffixes=suffixes,
copy=copy, indicator=indicator)
return op.get_result()

由此可见,pandas.merge() 方法的主体是建立 _MergeOperation 类对象并获取该操作的执行结果(通过 get_result())方法。因此 merge 操作的核心应该在 _MergeOperation 类中。

@Substitution@Appender 两个装饰器来自 pandas.util._decorators 包,用于处理 docstring。由于不是源码阅读的重点,因此这里不做过多解释。

吐槽:光是文档注释就要用到装饰器的语言特性进行处理,是该称赞Python的动态特性呢,还是该批评其对于性能的影响呢。

_MergeOperation

整个类的定义在 pandas/core/reshape/merge.py#L491-L953 中。

构造函数(__init__ 方法)

pandas/core/reshape/merge.py#L491-L562

class _MergeOperation(object):
"""
Perform a database (SQL) merge operation between two DataFrame objects
using either columns as keys or their row indexes
"""
_merge_type = 'merge'

def __init__(self, left, right, how='inner', on=None,
left_on=None, right_on=None, axis=1,
left_index=False, right_index=False, sort=True,
suffixes=('_x', '_y'), copy=True, indicator=False):
self.left = self.orig_left = left
self.right = self.orig_right = right
self.how = how
self.axis = axis

self.on = com._maybe_make_list(on)
self.left_on = com._maybe_make_list(left_on)
self.right_on = com._maybe_make_list(right_on)

self.copy = copy
self.suffixes = suffixes
self.sort = sort

self.left_index = left_index
self.right_index = right_index

self.indicator = indicator

if isinstance(self.indicator, compat.string_types):
self.indicator_name = self.indicator
elif isinstance(self.indicator, bool):
self.indicator_name = '_merge' if self.indicator else None
else:
raise ValueError(
'indicator option can only accept boolean or string arguments')

if not isinstance(left, DataFrame):
raise ValueError(
'can not merge DataFrame with instance of '
'type {0}'.format(type(left)))
if not isinstance(right, DataFrame):
raise ValueError(
'can not merge DataFrame with instance of '
'type {0}'.format(type(right)))

if not is_bool(left_index):
raise ValueError(
'left_index parameter must be of type bool, not '
'{0}'.format(type(left_index)))
if not is_bool(right_index):
raise ValueError(
'right_index parameter must be of type bool, not '
'{0}'.format(type(right_index)))

# warn user when merging between different levels
if left.columns.nlevels != right.columns.nlevels:
msg = ('merging between different levels can give an unintended '
'result ({0} levels on the left, {1} on the right)')
msg = msg.format(left.columns.nlevels, right.columns.nlevels)
warnings.warn(msg, UserWarning)

self._validate_specification()

# note this function has side effects
(self.left_join_keys,
self.right_join_keys,
self.join_names) = self._get_merge_keys()

# validate the merge keys dtypes. We may need to coerce
# to avoid incompat dtypes
self._maybe_coerce_merge_keys()

__init__() 方法前面很大一部分都在做对象的数据成员的赋值,其中对于不少参数还要进行类型判断以进行特殊处理或者报错。其中还用到了 com._maybe_make_list() 方法处理 onleft_onright_on这三个参数的值。compandas.core.common 包的别名,_maybe_make_list() 方法其实是将一个非 list/tuple 的单一元素转换为包含该单个元素的 list。一大堆赋值和处理、报错过后,终于执行到了第一个类内方法:_validate_specification()

校验 merge 参数:_validate_specification() 方法

pandas/core/reshape/merge.py#L911-L953

class _MergeOperation(object):
# ...

def _validate_specification(self):
# Hm, any way to make this logic less complicated??
if self.on is None and self.left_on is None and self.right_on is None:

if self.left_index and self.right_index:
self.left_on, self.right_on = (), ()
elif self.left_index:
if self.right_on is None:
raise MergeError('Must pass right_on or right_index=True')
elif self.right_index:
if self.left_on is None:
raise MergeError('Must pass left_on or left_index=True')
else:
# use the common columns
common_cols = self.left.columns.intersection(
self.right.columns)
if len(common_cols) == 0:
raise MergeError('No common columns to perform merge on')
if not common_cols.is_unique:
raise MergeError("Data columns not unique: %s"
% repr(common_cols))
self.left_on = self.right_on = common_cols
elif self.on is not None:
if self.left_on is not None or self.right_on is not None:
raise MergeError('Can only pass argument "on" OR "left_on" '
'and "right_on", not a combination of both.')
self.left_on = self.right_on = self.on
elif self.left_on is not None:
n = len(self.left_on)
if self.right_index:
if len(self.left_on) != self.right.index.nlevels:
raise ValueError('len(left_on) must equal the number '
'of levels in the index of "right"')
self.right_on = [None] * n
elif self.right_on is not None:
n = len(self.right_on)
if self.left_index:
if len(self.right_on) != self.left.index.nlevels:
raise ValueError('len(right_on) must equal the number '
'of levels in the index of "left"')
self.left_on = [None] * n
if len(self.right_on) != len(self.left_on):
raise ValueError("len(right_on) must equal len(left_on)")

上来先是一句吐槽。整个校验还是集中在对于 merge 的匹配参数onleft_onright_on的校验上。

  • 首先是onleft_onright_on都没有指定的情况:
    • 全部或者部分指定了 left_index 或者 right_index 参数?反正你要显式指定用于匹配的列或者用索引进行匹配;
    • 都没有指定?使用左右两个 DataFrame 中的公共列。
  • 指定了on参数:
    • 还指定了 left_on 或者 right_on 参数?不行,报错。
  • 指定匹配一边的列和另一边的索引:
    • 列的数量和索引的级别数量对么? 最后再次检验两边匹配的列的数量。

整个逻辑还好吧,也没有很复杂啊,不如说有点琐碎。

_get_merge_keys() 方法

pandas/core/reshape/merge.py#L755-L864

class _MergeOperation(object):
# ...
def _get_merge_keys(self):
"""
Note: has side effects (copy/delete key columns)
Parameters
----------
left
right
on
Returns
-------
left_keys, right_keys
"""
left_keys = []
right_keys = []
join_names = []
right_drop = []
left_drop = []
left, right = self.left, self.right

is_lkey = lambda x: isinstance(
x, (np.ndarray, Series)) and len(x) == len(left)
is_rkey = lambda x: isinstance(
x, (np.ndarray, Series)) and len(x) == len(right)

# Note that pd.merge_asof() has separate 'on' and 'by' parameters. A
# user could, for example, request 'left_index' and 'left_by'. In a
# regular pd.merge(), users cannot specify both 'left_index' and
# 'left_on'. (Instead, users have a MultiIndex). That means the
# self.left_on in this function is always empty in a pd.merge(), but
# a pd.merge_asof(left_index=True, left_by=...) will result in a
# self.left_on array with a None in the middle of it. This requires
# a work-around as designated in the code below.
# See _validate_specification() for where this happens.

# ugh, spaghetti re #733
if _any(self.left_on) and _any(self.right_on):
for lk, rk in zip(self.left_on, self.right_on):
if is_lkey(lk):
left_keys.append(lk)
if is_rkey(rk):
right_keys.append(rk)
join_names.append(None) # what to do?
else:
if rk is not None:
right_keys.append(right[rk]._values)
join_names.append(rk)
else:
# work-around for merge_asof(right_index=True)
right_keys.append(right.index)
join_names.append(right.index.name)
else:
if not is_rkey(rk):
if rk is not None:
right_keys.append(right[rk]._values)
else:
# work-around for merge_asof(right_index=True)
right_keys.append(right.index)
if lk is not None and lk == rk:
# avoid key upcast in corner case (length-0)
if len(left) > 0:
right_drop.append(rk)
else:
left_drop.append(lk)
else:
right_keys.append(rk)
if lk is not None:
left_keys.append(left[lk]._values)
join_names.append(lk)
else:
# work-around for merge_asof(left_index=True)
left_keys.append(left.index)
join_names.append(left.index.name)
elif _any(self.left_on):
for k in self.left_on:
if is_lkey(k):
left_keys.append(k)
join_names.append(None)
else:
left_keys.append(left[k]._values)
join_names.append(k)
if isinstance(self.right.index, MultiIndex):
right_keys = [lev._values.take(lab)
for lev, lab in zip(self.right.index.levels,
self.right.index.labels)]
else:
right_keys = [self.right.index.values]
elif _any(self.right_on):
for k in self.right_on:
if is_rkey(k):
right_keys.append(k)
join_names.append(None)
else:
right_keys.append(right[k]._values)
join_names.append(k)
if isinstance(self.left.index, MultiIndex):
left_keys = [lev._values.take(lab)
for lev, lab in zip(self.left.index.levels,
self.left.index.labels)]
else:
left_keys = [self.left.index.values]

if left_drop:
self.left = self.left.drop(left_drop, axis=1)

if right_drop:
self.right = self.right.drop(right_drop, axis=1)

return left_keys, right_keys, join_names

_maybe_coerce_merge_keys()方法

pandas/core/reshape/merge.py#L866-L909


def _maybe_coerce_merge_keys(self):
# we have valid mergee's but we may have to further
# coerce these if they are originally incompatible types
#
# for example if these are categorical, but are not dtype_equal
# or if we have object and integer dtypes

for lk, rk, name in zip(self.left_join_keys,
self.right_join_keys,
self.join_names):
if (len(lk) and not len(rk)) or (not len(lk) and len(rk)):
continue

# if either left or right is a categorical
# then the must match exactly in categories & ordered
if is_categorical_dtype(lk) and is_categorical_dtype(rk):
if lk.is_dtype_equal(rk):
continue
elif is_categorical_dtype(lk) or is_categorical_dtype(rk):
pass

elif is_dtype_equal(lk.dtype, rk.dtype):
continue

# if we are numeric, then allow differing
# kinds to proceed, eg. int64 and int8
# further if we are object, but we infer to
# the same, then proceed
if (is_numeric_dtype(lk) and is_numeric_dtype(rk)):
if lk.dtype.kind == rk.dtype.kind:
continue

# let's infer and see if we are ok
if lib.infer_dtype(lk) == lib.infer_dtype(rk):
continue

# Houston, we have a problem!
# let's coerce to object
if name in self.left.columns:
self.left = self.left.assign(
**{name: self.left[name].astype(object)})
if name in self.right.columns:
self.right = self.right.assign(
**{name: self.right[name].astype(object)})

获取 merge 结果(get_result() 方法)

class _MergeOperation(object):
# ...

def get_result(self):
if self.indicator:
self.left, self.right = self._indicator_pre_merge(
self.left, self.right)

join_index, left_indexer, right_indexer = self._get_join_info()

ldata, rdata = self.left._data, self.right._data
lsuf, rsuf = self.suffixes

llabels, rlabels = items_overlap_with_suffix(ldata.items, lsuf,
rdata.items, rsuf)

lindexers = {1: left_indexer} if left_indexer is not None else {}
rindexers = {1: right_indexer} if right_indexer is not None else {}

result_data = concatenate_block_managers(
[(ldata, lindexers), (rdata, rindexers)],
axes=[llabels.append(rlabels), join_index],
concat_axis=0, copy=self.copy)

typ = self.left._constructor
result = typ(result_data).__finalize__(self, method=self._merge_type)

if self.indicator:
result = self._indicator_post_merge(result)

self._maybe_add_join_keys(result, left_indexer, right_indexer)

return result

标志结果记录来源:_indicator_pre_merge() 方法与 _indicator_post_merge() 方法

_indicator_pre_merge() 中,首先检查了内部使用的标记记录来源(左还是右)的列名称(_left_indicator_right_indicator)有没有已经被占用了。有的话报错,没有的话将来源于左边设为1,将来源于右边设为2。并且将类型转换为 int8 型,以减少内存占用(默认应该是 int64,可能和机器架构有关)。

pandas/core/reshape/merge.py#L595-L616

    def _indicator_pre_merge(self, left, right):

columns = left.columns.union(right.columns)

for i in ['_left_indicator', '_right_indicator']:
if i in columns:
raise ValueError("Cannot use `indicator=True` option when "
"data contains a column named {}".format(i))
if self.indicator_name in columns:
raise ValueError(
"Cannot use name of an existing column for indicator column")

left = left.copy()
right = right.copy()

left['_left_indicator'] = 1
left['_left_indicator'] = left['_left_indicator'].astype('int8')

right['_right_indicator'] = 2
right['_right_indicator'] = right['_right_indicator'].astype('int8')

return left, right

_indicator_post_merge() 方法中的善后工作就很简单了,由于合并后的结果中虽然同时存在 _left_indicator_right_indicator 两列,但由于 merge 方式(inner、left、right、outer)的不同肯定有些行的这两列的值存在空白(NaN),因此首先将 NaN 替换为0,然后就可以将两列相加并转换为分类值(left_onlyright_onlyboth),即最终的记录来源值,然后删掉原来临时建立的两列。

pandas/core/reshape/merge.py#L618-L632

    def _indicator_post_merge(self, result):

result['_left_indicator'] = result['_left_indicator'].fillna(0)
result['_right_indicator'] = result['_right_indicator'].fillna(0)

result[self.indicator_name] = Categorical((result['_left_indicator'] +
result['_right_indicator']),
categories=[1, 2, 3])
result[self.indicator_name] = (
result[self.indicator_name]
.cat.rename_categories(['left_only', 'right_only', 'both']))

result = result.drop(labels=['_left_indicator', '_right_indicator'],
axis=1)
return result

pandas/core/reshape/merge.py#L708-L753

    def _get_join_indexers(self):
""" return the join indexers """
return _get_join_indexers(self.left_join_keys,
self.right_join_keys,
sort=self.sort,
how=self.how)

def _get_join_info(self):
left_ax = self.left._data.axes[self.axis]
right_ax = self.right._data.axes[self.axis]

if self.left_index and self.right_index and self.how != 'asof':
join_index, left_indexer, right_indexer = \
left_ax.join(right_ax, how=self.how, return_indexers=True,
sort=self.sort)
elif self.right_index and self.how == 'left':
join_index, left_indexer, right_indexer = \
_left_join_on_index(left_ax, right_ax, self.left_join_keys,
sort=self.sort)

elif self.left_index and self.how == 'right':
join_index, right_indexer, left_indexer = \
_left_join_on_index(right_ax, left_ax, self.right_join_keys,
sort=self.sort)
else:
(left_indexer,
right_indexer) = self._get_join_indexers()

if self.right_index:
if len(self.left) > 0:
join_index = self.left.index.take(left_indexer)
else:
join_index = self.right.index.take(right_indexer)
left_indexer = np.array([-1] * len(join_index))
elif self.left_index:
if len(self.right) > 0:
join_index = self.right.index.take(right_indexer)
else:
join_index = self.left.index.take(left_indexer)
right_indexer = np.array([-1] * len(join_index))
else:
join_index = Index(np.arange(len(left_indexer)))

if len(join_index) == 0:
join_index = join_index.astype(object)
return join_index, left_indexer, right_indexer

    def _maybe_add_join_keys(self, result, left_indexer, right_indexer):

left_has_missing = None
right_has_missing = None

keys = zip(self.join_names, self.left_on, self.right_on)
for i, (name, lname, rname) in enumerate(keys):
if not _should_fill(lname, rname):
continue

take_left, take_right = None, None

if name in result:

if left_indexer is not None and right_indexer is not None:
if name in self.left:

if left_has_missing is None:
left_has_missing = (left_indexer == -1).any()

if left_has_missing:
take_right = self.right_join_keys[i]

if not is_dtype_equal(result[name].dtype,
self.left[name].dtype):
take_left = self.left[name]._values

elif name in self.right:

if right_has_missing is None:
right_has_missing = (right_indexer == -1).any()

if right_has_missing:
take_left = self.left_join_keys[i]

if not is_dtype_equal(result[name].dtype,
self.right[name].dtype):
take_right = self.right[name]._values

elif left_indexer is not None \
and isinstance(self.left_join_keys[i], np.ndarray):

take_left = self.left_join_keys[i]
take_right = self.right_join_keys[i]

if take_left is not None or take_right is not None:

if take_left is None:
lvals = result[name]._values
else:
lfill = na_value_for_dtype(take_left.dtype)
lvals = algos.take_1d(take_left, left_indexer,
fill_value=lfill)

if take_right is None:
rvals = result[name]._values
else:
rfill = na_value_for_dtype(take_right.dtype)
rvals = algos.take_1d(take_right, right_indexer,
fill_value=rfill)

# if we have an all missing left_indexer
# make sure to just use the right values
mask = left_indexer == -1
if mask.all():
key_col = rvals
else:
key_col = Index(lvals).where(~mask, rvals)

if name in result:
result[name] = key_col
else:
result.insert(i, name or 'key_%d' % i, key_col)

concatenate_block_managers() 方法

pandas/core/internals.py#L4813-L4833

def concatenate_block_managers(mgrs_indexers, axes, concat_axis, copy):
"""
Concatenate block managers into one.

Parameters
----------
mgrs_indexers : list of (BlockManager, {axis: indexer,...}) tuples
axes : list of Index
concat_axis : int
copy : bool

"""
concat_plan = combine_concat_plans(
[get_mgr_concatenation_plan(mgr, indexers)
for mgr, indexers in mgrs_indexers], concat_axis)

blocks = [make_block(
concatenate_join_units(join_units, concat_axis, copy=copy),
placement=placement) for placement, join_units in concat_plan]

return BlockManager(blocks, axes)